diff --git a/.gitmodules b/.gitmodules index 5298fd26a6..616e252a93 100644 --- a/.gitmodules +++ b/.gitmodules @@ -14,3 +14,6 @@ path = tests url = https://github.com/taosdata/tests branch = 3.0 +[submodule "examples/rust"] + path = examples/rust + url = https://github.com/songtianyi/tdengine-rust-bindings.git diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in index 651a657c0d..52cef74db6 100644 --- a/cmake/bdb_CMakeLists.txt.in +++ b/cmake/bdb_CMakeLists.txt.in @@ -10,4 +10,4 @@ ExternalProject_Add(bdb BUILD_COMMAND "$(MAKE)" INSTALL_COMMAND "" TEST_COMMAND "" -) \ No newline at end of file +) diff --git a/contrib/test/bdb/CMakeLists.txt b/contrib/test/bdb/CMakeLists.txt index 62da0d4ac8..8dd84f7107 100644 --- a/contrib/test/bdb/CMakeLists.txt +++ b/contrib/test/bdb/CMakeLists.txt @@ -4,4 +4,4 @@ target_sources( "bdbTest.c" ) -target_link_libraries(bdbTest bdb) \ No newline at end of file +target_link_libraries(bdbTest bdb) diff --git a/contrib/test/bdb/bdbTest.c b/contrib/test/bdb/bdbTest.c index cbdc07be91..ddcba23b1f 100644 --- a/contrib/test/bdb/bdbTest.c +++ b/contrib/test/bdb/bdbTest.c @@ -185,4 +185,4 @@ static int idx_callback(DB * sdbp, /* secondary db handle */ skeyp->data = tmpdbt; return 0; -} \ No newline at end of file +} diff --git a/example/src/tmq.c b/example/src/tmq.c index d8c90a0127..ac7098b254 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -28,7 +28,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/examples/rust b/examples/rust new file mode 160000 index 0000000000..1c8924dc66 --- /dev/null +++ b/examples/rust @@ -0,0 +1 @@ +Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7337f1afb8..b8ed29b2a5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -52,20 +52,20 @@ extern char* tMsgInfo[]; extern int tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) -#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] -#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) +#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) +#define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] +#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type -#define TSDB_IE_TYPE_SEC 1 -#define TSDB_IE_TYPE_META 2 -#define TSDB_IE_TYPE_MGMT_IP 3 -#define TSDB_IE_TYPE_DNODE_CFG 4 +#define TSDB_IE_TYPE_SEC 1 +#define TSDB_IE_TYPE_META 2 +#define TSDB_IE_TYPE_MGMT_IP 3 +#define TSDB_IE_TYPE_DNODE_CFG 4 #define TSDB_IE_TYPE_NEW_VERSION 5 -#define TSDB_IE_TYPE_DNODE_EXT 6 +#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 typedef enum { @@ -100,7 +100,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_STREAMS, TSDB_MGMT_TABLE_VARIABLES, TSDB_MGMT_TABLE_CONNS, - TSDB_MGMT_TABLE_SCORES, + TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_CLUSTER, @@ -110,53 +110,53 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_MAX, } EShowType; -#define TSDB_ALTER_TABLE_ADD_TAG 1 -#define TSDB_ALTER_TABLE_DROP_TAG 2 +#define TSDB_ALTER_TABLE_ADD_TAG 1 +#define TSDB_ALTER_TABLE_DROP_TAG 2 #define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 -#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 +#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 -#define TSDB_FILL_NONE 0 -#define TSDB_FILL_NULL 1 +#define TSDB_FILL_NONE 0 +#define TSDB_FILL_NULL 1 #define TSDB_FILL_SET_VALUE 2 -#define TSDB_FILL_LINEAR 3 -#define TSDB_FILL_PREV 4 -#define TSDB_FILL_NEXT 5 +#define TSDB_FILL_LINEAR 3 +#define TSDB_FILL_PREV 4 +#define TSDB_FILL_NEXT 5 -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_SUPERUSER 0x2 -#define TSDB_ALTER_USER_ADD_READ_DB 0x3 -#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 -#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 -#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_SUPERUSER 0x2 +#define TSDB_ALTER_USER_ADD_READ_DB 0x3 +#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 +#define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 +#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 #define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 -#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 +#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_KILL_MSG_LEN 30 -#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) -#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) +#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_COL_NORMAL 0x0u // the normal column of the table -#define TSDB_COL_TAG 0x1u // the tag column type -#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column -#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators -#define TSDB_COL_NULL 0x8u // the column filter NULL or not +#define TSDB_COL_TAG 0x1u // the tag column type +#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column +#define TSDB_COL_TMP 0x4u // internal column generated by the previous operators +#define TSDB_COL_NULL 0x8u // the column filter NULL or not -#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) +#define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) -#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) -#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) +#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -#define TD_SUPER_TABLE TSDB_SUPER_TABLE -#define TD_CHILD_TABLE TSDB_CHILD_TABLE +#define TD_SUPER_TABLE TSDB_SUPER_TABLE +#define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE typedef struct { @@ -1083,11 +1083,11 @@ typedef struct { } STaskDropRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - int8_t igExists; - char* sql; - char* physicalPlan; - char* logicalPlan; + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + char* sql; + char* physicalPlan; + char* logicalPlan; } SMCreateTopicReq; int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq); @@ -1733,7 +1733,7 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct { @@ -1741,9 +1741,42 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqMVRebRsp; +typedef struct { + int32_t vgId; + int64_t offset; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqOffset; + +typedef struct { + int32_t num; + SMqOffset* offsets; +} SMqCMResetOffsetReq; + +typedef struct { + int32_t reserved; +} SMqCMResetOffsetRsp; + +typedef struct { + int32_t num; + SMqOffset* offsets; +} SMqMVResetOffsetReq; + +typedef struct { + int32_t reserved; +} SMqMVResetOffsetRsp; + +int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); +int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); +int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq); +int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq); + +int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq); +int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq); + typedef struct { uint32_t nCols; SSchema* pSchema; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ca30e682bf..9e5613ef03 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -142,6 +142,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 068b80cf59..1ea1f0316a 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -100,7 +100,7 @@ SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); SNodeList* nodesMakeList(); -SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode); +int32_t nodesListAppend(SNodeList* pList, SNode* pNode); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); SNode* nodesListGetNode(SNodeList* pList, int32_t index); void nodesDestroyList(SNodeList* pList); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index de6f8747e8..4d7a474e6d 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -50,6 +50,7 @@ typedef enum EColumnType { typedef struct SColumnNode { SExprNode node; // QUERY_NODE_COLUMN + uint64_t tableId; int16_t colId; EColumnType colType; // column or tag char dbName[TSDB_DB_NAME_LEN]; @@ -59,10 +60,11 @@ typedef struct SColumnNode { SNode* pProjectRef; } SColumnNode; -typedef struct SColumnRef { +typedef struct SColumnRefNode { ENodeType type; + int32_t tupleId; int32_t slotId; -} SColumnRef; +} SColumnRefNode; typedef struct SValueNode { SExprNode node; // QUERY_NODE_VALUE @@ -269,6 +271,25 @@ typedef struct SSetOperator { SNode* pLimit; } SSetOperator; +typedef enum ESqlClause { + SQL_CLAUSE_FROM = 1, + SQL_CLAUSE_WHERE, + SQL_CLAUSE_PARTITION_BY, + SQL_CLAUSE_WINDOW, + SQL_CLAUSE_GROUP_BY, + SQL_CLAUSE_HAVING, + SQL_CLAUSE_SELECT, + SQL_CLAUSE_ORDER_BY +} ESqlClause; + +void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); +void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext); + +int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols); + +typedef bool (*FFuncClassifier)(int32_t funcId); +int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs); + bool nodesIsExprNode(const SNode* pNode); bool nodesIsArithmeticOp(const SOperatorNode* pOp); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1b70c78af9..76f23da1e3 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -113,6 +113,7 @@ typedef struct STableMetaOutput { typedef struct SDataBuf { void *pData; uint32_t len; + void *handle; } SDataBuf; typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5c12d29b50..0a822927ad 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -459,7 +459,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A) //Not a GROUP BY expression #define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B) //Not SELECTed expression #define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C) //Not a single-group group function -#define TSDB_CODE_PAR_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x260D) //Out of memory #ifdef __cplusplus } diff --git a/include/util/tdef.h b/include/util/tdef.h index 292a3126c8..23f52577de 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -216,6 +216,10 @@ do { \ #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_TRANS_STAGE_LEN 12 +#define TSDB_TRANS_DESC_LEN 16 +#define TSDB_TRANS_ERROR_LEN 128 + #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 73a09f473e..45c1858948 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) { hbClearReqInfo(pAppHbMgr); break; } + tSerializeSClientHbBatchReq(buf, tlen, pReq); - SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); + SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); + if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq, false); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f3bbb9481b..2eff353c39 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -12,10 +12,10 @@ #include "tpagedbuf.h" #include "tref.h" -static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); -static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); +static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) { return true; } -static bool validateUserName(const char* user) { - return stringLengthCheck(user, TSDB_USER_LEN - 1); -} +static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); } -static bool validatePassword(const char* passwd) { - return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); -} +static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); } -static bool validateDbName(const char* db) { - return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); -} +static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); } static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) { char key[512] = {0}; @@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i return strdup(key); } -static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo); +static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { +TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, + uint16_t port) { if (taos_init() != TSDB_CODE_SUCCESS) { return NULL; } @@ -63,7 +59,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, char localDb[TSDB_DB_NAME_LEN] = {0}; if (db != NULL) { - if(!validateDbName(db)) { + if (!validateDbName(db)) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; return NULL; } @@ -79,7 +75,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return NULL; } - taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt); + taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); } else { tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt)); } @@ -99,7 +95,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, } } - char* key = getClusterKey(user, secretEncrypt, ip, port); + char* key = getClusterKey(user, secretEncrypt, ip, port); SAppInstInfo** pInst = NULL; pthread_mutex_lock(&appInfo.mutex); @@ -108,7 +104,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = NULL; if (pInst == NULL) { p = calloc(1, sizeof(struct SAppInstInfo)); - p->mgmtEp = epSet; + p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); @@ -122,7 +118,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst); } -int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) { +int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); if (*pRequest == NULL) { tscError("failed to malloc sqlObj"); @@ -131,7 +127,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr = malloc(sqlLen + 1); if ((*pRequest)->sqlstr == NULL) { - tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self); + tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self); (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer"); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -140,7 +136,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; - tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); + tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } @@ -148,14 +144,14 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { STscObj* pTscObj = pRequest->pTscObj; SParseContext cxt = { - .requestId = pRequest->requestId, - .acctId = pTscObj->acctId, - .db = getDbOfConnection(pTscObj), - .pSql = pRequest->sqlstr, - .sqlLen = pRequest->sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pTransporter = pTscObj->pAppInfo->pTransporter, + .requestId = pRequest->requestId, + .acctId = pTscObj->acctId, + .db = getDbOfConnection(pTscObj), + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, }; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); @@ -174,9 +170,9 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; pRequest->type = pDcl->msgType; - pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen}; + pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL}; - STscObj* pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; @@ -202,7 +198,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SSchema* pSchema = NULL; int32_t numOfCols = 0; - int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); + int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); if (code != 0) { return code; } @@ -224,7 +220,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; + pResInfo->fields[i].type = pSchema[i].type; tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); } } @@ -233,7 +229,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); + int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { // handle error and retry } else { @@ -250,9 +246,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob); } -TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { - STscObj *pTscObj = (STscObj *)taos; - if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) { +TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; @@ -260,9 +256,9 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) - SRequestObj *pRequest = NULL; - SQueryNode *pQueryNode = NULL; - SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + SRequestObj* pRequest = NULL; + SQueryNode* pQueryNode = NULL; + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); @@ -286,13 +282,13 @@ _return: return pRequest; } -int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) { +int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; // init mnode ip set - SEpSet *mgmtEpSet = &(pEpSet->epSet); + SEpSet* mgmtEpSet = &(pEpSet->epSet); mgmtEpSet->numOfEps = 0; - mgmtEpSet->inUse = 0; + mgmtEpSet->inUse = 0; if (firstEp && firstEp[0] != 0) { if (strlen(firstEp) >= TSDB_EP_LEN) { @@ -322,14 +318,15 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return 0; } -STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) { - STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo); +STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo) { + STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return pTscObj; } - SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); + SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); if (pRequest == NULL) { destroyTscObj(pTscObj); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -343,14 +340,16 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { - const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); + const char* errorMsg = + (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); printf("failed to connect to server, reason: %s\n\n", errorMsg); destroyRequest(pRequest); taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); + tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, + pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -365,11 +364,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { } pMsgSendInfo->msgType = TDMT_MND_CONNECT; + pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->param = pRequest; + SConnectReq connectReq = {0}; STscObj* pObj = pRequest->pTscObj; @@ -399,17 +400,17 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; assert(pMsg->ahandle != NULL); if (pSendInfo->requestObjRefId != 0) { - SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); + SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId); pRequest->metric.rsp = taosGetTimestampMs(); pRequest->code = pMsg->code; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; if (pEpSet) { if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); @@ -417,22 +418,22 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { } /* - * There is not response callback function for submit response. - * The actual inserted number of points is the first number. + * There is not response callback function for submit response. + * The actual inserted number of points is the first number. */ int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle}; if (pMsg->contLen > 0) { buf.pData = calloc(1, pMsg->contLen); @@ -449,7 +450,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { destroySendMsgInfo(pSendInfo); } -TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { +TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db); if (user == NULL) { user = TSDB_DEFAULT_USER; @@ -463,16 +464,17 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons return taos_connect_internal(ip, user, NULL, auth, db, port); } -TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) { - char ipStr[TSDB_EP_LEN] = {0}; +TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen, + const char* db, int dbLen, uint16_t port) { + char ipStr[TSDB_EP_LEN] = {0}; char dbStr[TSDB_DB_NAME_LEN] = {0}; - char userStr[TSDB_USER_LEN] = {0}; - char passStr[TSDB_PASSWORD_LEN] = {0}; + char userStr[TSDB_USER_LEN] = {0}; + char passStr[TSDB_PASSWORD_LEN] = {0}; - strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen)); + strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen)); strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen)); strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen)); - strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen)); + strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen)); return taos_connect(ipStr, userStr, passStr, dbStr, port); } @@ -490,15 +492,15 @@ void* doFetchRow(SRequestObj* pRequest) { } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData); + int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData); if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; return NULL; } setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); - tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows, - pResInfo->totalRows, pResInfo->completed, pRequest->requestId); + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); if (pResultInfo->numOfRows == 0) { return NULL; @@ -511,7 +513,7 @@ void* doFetchRow(SRequestObj* pRequest) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; - SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); epSet = pVgroupInfo->epset; } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { @@ -522,7 +524,7 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); pShowReq->head.vgId = htonl(pVgroupInfo->vgId); @@ -533,14 +535,14 @@ void* doFetchRow(SRequestObj* pRequest) { epSet = pVgroupInfo->epset; int64_t transporterId = 0; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; - } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { + } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - + if (pResultInfo->completed) { return NULL; } @@ -549,7 +551,7 @@ void* doFetchRow(SRequestObj* pRequest) { SMsgSendInfo* body = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); @@ -562,7 +564,7 @@ void* doFetchRow(SRequestObj* pRequest) { _return: - for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) { + for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); @@ -576,8 +578,8 @@ _return: static void doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); } } @@ -594,14 +596,14 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { pResultInfo->length[i] = pResultInfo->fields[i].bytes; - pResultInfo->row[i] = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows); - pResultInfo->pCol[i] = pResultInfo->row[i]; + pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); + pResultInfo->pCol[i] = pResultInfo->row[i]; offset += pResultInfo->fields[i].bytes; } } char* getDbOfConnection(STscObj* pObj) { - char *p = NULL; + char* p = NULL; pthread_mutex_lock(&pObj->mutex); size_t len = strlen(pObj->db); if (len > 0) { @@ -622,10 +624,10 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*) pRsp; - pResultInfo->pData = (void*) pRsp->data; + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; + pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); pResultInfo->totalRows += pResultInfo->numOfRows; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 48c9920c0c..454fb7f1fe 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq); void* pReq = malloc(contLen); tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq); - pMsgSendInfo->msgInfo.pData = pReq; pMsgSendInfo->msgInfo.len = contLen; + pMsgSendInfo->msgInfo.handle = NULL; } else { SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); if (pFetchMsg == NULL) { @@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { pMsgSendInfo->msgInfo.pData = pFetchMsg; pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); + pMsgSendInfo->msgInfo.handle = NULL; } } else { assert(pRequest != NULL); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 419d6294a7..e602cee302 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -35,9 +35,7 @@ struct tmq_list_t { }; struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t offset; + SMqOffset offset; }; struct tmq_topic_vgroup_list_t { @@ -123,6 +121,12 @@ typedef struct { tsem_t rspSem; } SMqCommitCbParam; +typedef struct { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqResetOffsetParam; + tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; @@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { return 0; } -tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { - // build msg - // send to mnode - return TMQ_RESP_ERR__SUCCESS; -} - int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; @@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param; + pParam->rspErr = code; + tsem_post(&pParam->rspSem); + return 0; +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } +tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { + SRequestObj* pRequest = NULL; + // build msg + // send to mnode + SMqCMResetOffsetReq req; + req.num = offsets->cnt; + req.offsets = (SMqOffset*)offsets->elems; + + SCoder encoder; + + tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); + tEncodeSMqCMResetOffsetReq(&encoder, &req); + int32_t tlen = encoder.pos; + void* buf = malloc(tlen); + if (buf == NULL) { + tCoderClear(&encoder); + return -1; + } + tCoderClear(&encoder); + + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); + tEncodeSMqCMResetOffsetReq(&encoder, &req); + tCoderClear(&encoder); + + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET); + if (pRequest == NULL) { + tscError("failed to malloc request"); + } + + SMqResetOffsetParam param = {0}; + tsem_init(¶m.rspSem, 0, 0); + param.tmq = tmq; + + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->param = ¶m; + sendInfo->fp = tmqResetOffsetCb; + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + tsem_wait(¶m.rspSem); + tsem_destroy(¶m.rspSem); + + return TMQ_RESP_ERR__SUCCESS; +} + tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj* pRequest = NULL; int32_t sz = topic_list->cnt; @@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFname == NULL) { + goto _return; } tNameExtractFullName(&name, topicFname); tscDebug("subscribe topic: %s", topicFname); @@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { .nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL}; topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); taosArrayPush(tmq->clientTopics, &topic); - /*SMqClientTopic topic = {*/ - /*.*/ - /*};*/ taosArrayPush(req.topicNames, &topicFname); free(dbName); } @@ -270,13 +322,13 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); if (pRequest == NULL) { - tscError("failed to malloc sqlObj"); + tscError("failed to malloc request"); } SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -401,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tSerializeMCreateTopicReq(buf, tlen, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); @@ -727,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { param->pVg = pVg; tsem_init(¶m->rspSem, 0, 0); + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; + pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3806227803..4b6171362e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { tCoderClear(&decoder); return 0; } + +int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) { + if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; + if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; + if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1; + if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1; + return encoder->pos; +} + +int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) { + if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1; + if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1; + if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1; + if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1; + return 0; +} + +int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) { + if (tStartEncode(encoder) < 0) return -1; + if (tEncodeI32(encoder, pReq->num) < 0) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tEncodeSMqOffset(encoder, &pReq->offsets[i]); + } + tEndEncode(encoder); + return encoder->pos; +} + +int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) { + if (tDecodeI32(decoder, &pReq->num) < 0) return -1; + pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); + if (pReq->offsets == NULL) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tDecodeSMqOffset(decoder, &pReq->offsets[i]); + } + return 0; +} + +int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) { + if (tEncodeI32(encoder, pReq->num) < 0) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tEncodeSMqOffset(encoder, &pReq->offsets[i]); + } + return encoder->pos; +} + +int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) { + if (tDecodeI32(decoder, &pReq->num) < 0) return -1; + pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); + if (pReq->offsets == NULL) return -1; + for (int32_t i = 0; i < pReq->num; i++) { + tDecodeSMqOffset(decoder, &pReq->offsets[i]); + } + return 0; +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 73c0446e3f..abc802c588 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -104,6 +104,10 @@ typedef enum { TRN_STAGE_FINISHED = 8 } ETrnStage; +typedef enum { + TRN_TYPE_CREATE_DB = 0, +} ETrnType; + typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { @@ -135,6 +139,12 @@ typedef struct { SArray* commitLogs; SArray* redoActions; SArray* undoActions; + int64_t createdTime; + int64_t lastExecTime; + int32_t transType; + uint64_t dbUid; + char dbname[TSDB_DB_NAME_LEN]; + char lastError[TSDB_TRANS_DESC_LEN]; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e772ff326c..eb5f39e983 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -914,6 +914,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { sdbRelease(pSdb, pVgroup); } + + sdbCancelFetch(pSdb, pIter); } static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 3198ded37e..afb338e97d 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -295,8 +295,8 @@ char *mndShowStr(int32_t showType) { return "show configs"; case TSDB_MGMT_TABLE_CONNS: return "show connections"; - case TSDB_MGMT_TABLE_SCORES: - return "show scores"; + case TSDB_MGMT_TABLE_TRANS: + return "show trans"; case TSDB_MGMT_TABLE_GRANTS: return "show grants"; case TSDB_MGMT_TABLE_VNODES: diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 53ffd8698f..c5b18bcde0 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); @@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 840326d318..ceb31149ca 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId != -1); - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); void *buf; int32_t tlen; @@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum } static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - void *buf; int32_t tlen; if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -345,6 +346,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } } + if (status == MQ_CONSUMER_STATUS__MODIFY) { + int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics); + for (int32_t i = 0; i < removeSz; i++) { + char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i); + free(topicName); + } + taosArrayClear(pConsumer->recentRemovedTopics); + } } } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { @@ -714,7 +723,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pTopic->dbUid) continue; + if (pVgroup->dbUid != pTopic->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } pSub->vgNum++; plan->execNode.nodeId = pVgroup->vgId; @@ -748,7 +760,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, @@ -776,6 +787,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -1013,6 +1026,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { break; } } + char *oldTopicNameDup = strdup(oldTopicName); + taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup); atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY); /*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/ } else if (newTopicName != NULL) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3230074add..99a5a3a475 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -15,10 +15,13 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" +#include "mndAuth.h" +#include "mndShow.h" #include "mndSync.h" +#include "mndUser.h" -#define MND_TRANS_VER_NUMBER 1 -#define MND_TRANS_ARRAY_SIZE 8 +#define MND_TRANS_VER_NUMBER 1 +#define MND_TRANS_ARRAY_SIZE 8 #define MND_TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -51,7 +54,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); -static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); +static int32_t mndProcessTransReq(SMnodeMsg *pMsg); + +static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, @@ -62,7 +69,11 @@ int32_t mndInitTrans(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndTransActionUpdate, .deleteFp = (SdbDeleteFp)mndTransActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq); + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans); return sdbSetTable(pMnode->pSdb, table); } @@ -80,17 +91,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); + rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); } for (int32_t i = 0; i < undoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); + rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); } for (int32_t i = 0; i < commitLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); + rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); } for (int32_t i = 0; i < redoActionNum; ++i) { @@ -296,9 +307,43 @@ TRANS_DECODE_OVER: return pRow; } +static const char *mndTransStr(ETrnStage stage) { + switch (stage) { + case TRN_STAGE_PREPARE: + return "prepare"; + case TRN_STAGE_REDO_LOG: + return "redoLog"; + case TRN_STAGE_REDO_ACTION: + return "redoAction"; + case TRN_STAGE_COMMIT: + return "commit"; + case TRN_STAGE_COMMIT_LOG: + return "commitLog"; + case TRN_STAGE_UNDO_ACTION: + return "undoAction"; + case TRN_STAGE_UNDO_LOG: + return "undoLog"; + case TRN_STAGE_ROLLBACK: + return "rollback"; + case TRN_STAGE_FINISHED: + return "finished"; + default: + return "invalid"; + } +} + +static const char *mndTransType(ETrnType type) { + switch (type) { + case TRN_TYPE_CREATE_DB: + return "create-db"; + default: + return "invalid"; + } +} + static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { - pTrans->stage = TRN_STAGE_PREPARE; - mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans); + // pTrans->stage = TRN_STAGE_PREPARE; + mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); return 0; } @@ -309,28 +354,31 @@ static void mndTransDropData(STrans *pTrans) { mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); if (pTrans->rpcRsp != NULL) { - rpcFreeCont(pTrans->rpcRsp); + free(pTrans->rpcRsp); pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; } } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans); + mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); mndTransDropData(pTrans); return 0; } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { - if (pNew->stage == TRN_STAGE_COMMIT) pNew->stage = TRN_STAGE_COMMIT_LOG; + if (pNew->stage == TRN_STAGE_COMMIT) { + pNew->stage = TRN_STAGE_COMMIT_LOG; + mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); + } - mTrace("trans:%d, perform update action, old row:%p stage:%d, new row:%p stage:%d", pOld->id, pOld, pOld->stage, pNew, - pNew->stage); + mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, + mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); pOld->stage = pNew->stage; return 0; } -STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { +static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { SSdb *pSdb = pMnode->pSdb; STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); if (pTrans == NULL) { @@ -339,7 +387,7 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { return pTrans; } -void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { +static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pTrans); } @@ -375,8 +423,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) { } static void mndTransDropLogs(SArray *pArray) { - if (pArray == NULL) return; - for (int32_t i = 0; i < pArray->size; ++i) { + int32_t size = taosArrayGetSize(pArray); + for (int32_t i = 0; i < size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); sdbFreeRaw(pRaw); } @@ -385,10 +433,10 @@ static void mndTransDropLogs(SArray *pArray) { } static void mndTransDropActions(SArray *pArray) { - if (pArray == NULL) return; - for (int32_t i = 0; i < pArray->size; ++i) { + int32_t size = taosArrayGetSize(pArray); + for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - free(pAction->pCont); + tfree(pAction->pCont); } taosArrayDestroy(pArray); @@ -941,8 +989,8 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { mndTransSendRpcRsp(pTrans); } -static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { - mndTransPullup(pMsg->pMnode); +static int32_t mndProcessTransReq(SMnodeMsg *pReq) { + mndTransPullup(pReq->pMnode); return 0; } @@ -960,3 +1008,122 @@ void mndTransPullup(SMnode *pMnode) { sdbWriteFile(pMnode->pSdb); } + +static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchemas; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "stage"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "db"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "type"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "last_exec_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "last_error"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pMeta->numOfColumns = cols; + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_TRANS); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbName, mndShowStr(pShow->type)); + return 0; +} + +static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + STrans *pTrans = NULL; + int32_t cols = 0; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); + if (pShow->pIter == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pTrans->id; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTrans->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mndTransStr(pTrans->stage)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, pTrans->dbname); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mndTransType(pTrans->transType)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTrans->lastExecTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, pTrans->lastError); + cols++; + + numOfRows++; + sdbRelease(pSdb, pTrans); + } + + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 315268a9e3..204cd870f4 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "sdbInt.h" +static void sdbCheck(SSdb *pSdb, SSdbRow *pRow); + const char *sdbTableName(ESdbType type) { switch (type) { case SDB_TRANS: @@ -221,6 +223,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow); + + sdbCheck(pSdb, pOldRow); // sdbRelease(pSdb, pOldRow->pObj); return 0; } @@ -305,6 +309,19 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { return pRet; } +static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { + SRWLatch *pLock = &pSdb->locks[pRow->type]; + taosRLockLatch(pLock); + + int32_t ref = atomic_load_32(&pRow->refCount); + sdbPrintOper(pSdb, pRow, "checkRow"); + if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { + sdbFreeRow(pSdb, pRow); + } + + taosRUnLockLatch(pLock); +} + void sdbRelease(SSdb *pSdb, void *pObj) { if (pObj == NULL) return; @@ -332,6 +349,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SRWLatch *pLock = &pSdb->locks[type]; taosRLockLatch(pLock); +#if 0 if (pIter != NULL) { SSdbRow *pLastRow = *(SSdbRow **)pIter; int32_t ref = atomic_load_32(&pLastRow->refCount); @@ -339,6 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { sdbFreeRow(pSdb, pLastRow); } } +#endif SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2f70ab5080..4dc46b3798 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu if (pTopic->pReadhandle == NULL) { ASSERT(false); } - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - pTopic->buffer.output[i].status = 0; + for (int j = 0; j < TQ_BUFFER_SIZE; j++) { + pTopic->buffer.output[j].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; - pTopic->buffer.output[i].pReadHandle = pReadHandle; - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); + pTopic->buffer.output[j].pReadHandle = pReadHandle; + pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index b7e7ad6f0b..e61375b202 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -264,3 +264,60 @@ void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { (void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext); } + +void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { + if (NULL == pSelect) { + return; + } + + switch (clause) { + case SQL_CLAUSE_FROM: + nodesWalkNode(pSelect->pFromTable, walker, pContext); + nodesWalkNode(pSelect->pWhere, walker, pContext); + case SQL_CLAUSE_WHERE: + nodesWalkList(pSelect->pPartitionByList, walker, pContext); + case SQL_CLAUSE_PARTITION_BY: + nodesWalkNode(pSelect->pWindow, walker, pContext); + case SQL_CLAUSE_WINDOW: + nodesWalkList(pSelect->pGroupByList, walker, pContext); + case SQL_CLAUSE_GROUP_BY: + nodesWalkNode(pSelect->pHaving, walker, pContext); + case SQL_CLAUSE_HAVING: + nodesWalkList(pSelect->pProjectionList, walker, pContext); + case SQL_CLAUSE_SELECT: + nodesWalkList(pSelect->pOrderByList, walker, pContext); + case SQL_CLAUSE_ORDER_BY: + default: + break; + } + + return; +} + +void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext) { + if (NULL == pSelect) { + return; + } + + switch (clause) { + case SQL_CLAUSE_FROM: + nodesRewriteNode(&(pSelect->pFromTable), rewriter, pContext); + nodesRewriteNode(&(pSelect->pWhere), rewriter, pContext); + case SQL_CLAUSE_WHERE: + nodesRewriteList(pSelect->pPartitionByList, rewriter, pContext); + case SQL_CLAUSE_PARTITION_BY: + nodesRewriteNode(&(pSelect->pWindow), rewriter, pContext); + case SQL_CLAUSE_WINDOW: + nodesRewriteList(pSelect->pGroupByList, rewriter, pContext); + case SQL_CLAUSE_GROUP_BY: + nodesRewriteNode(&(pSelect->pHaving), rewriter, pContext); + case SQL_CLAUSE_HAVING: + nodesRewriteList(pSelect->pProjectionList, rewriter, pContext); + case SQL_CLAUSE_SELECT: + nodesRewriteList(pSelect->pOrderByList, rewriter, pContext); + default: + break; + } + + return; +} diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 520a3f82ca..7dc1717233 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -15,8 +15,10 @@ #include "querynodes.h" #include "nodesShowStmts.h" +#include "taos.h" #include "taoserror.h" #include "taos.h" +#include "thash.h" static SNode* makeNode(ENodeType type, size_t size) { SNode* p = calloc(1, size); @@ -99,14 +101,14 @@ SNodeList* nodesMakeList() { return p; } -SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) { +int32_t nodesListAppend(SNodeList* pList, SNode* pNode) { if (NULL == pList || NULL == pNode) { - return NULL; + return TSDB_CODE_SUCCESS; } SListCell* p = calloc(1, sizeof(SListCell)); if (NULL == p) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return pList; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } p->pNode = pNode; if (NULL == pList->pHead) { @@ -117,7 +119,7 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) { } pList->pTail = p; ++(pList->length); - return pList; + return TSDB_CODE_SUCCESS; } SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { @@ -239,3 +241,99 @@ bool nodesIsTimeorderQuery(const SNode* pQuery) { bool nodesIsTimelineQuery(const SNode* pQuery) { return false; } + +typedef struct SCollectColumnsCxt { + int32_t errCode; + uint64_t tableId; + bool realCol; + SNodeList* pCols; + SHashObj* pColIdHash; +} SCollectColumnsCxt; + +static EDealRes doCollect(SCollectColumnsCxt* pCxt, int32_t id, SNode* pNode) { + if (NULL == taosHashGet(pCxt->pColIdHash, &id, sizeof(id))) { + pCxt->errCode = taosHashPut(pCxt->pColIdHash, &id, sizeof(id), NULL, 0); + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pCxt->errCode = nodesListAppend(pCxt->pCols, pNode); + } + return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); + } + return DEAL_RES_CONTINUE; +} + +static EDealRes collectColumns(SNode* pNode, void* pContext) { + SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext; + + if (pCxt->realCol && QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + int32_t colId = pCol->colId; + if (pCxt->tableId == pCol->tableId && colId > 0) { + return doCollect(pCxt, colId, pNode); + } + } else if (!pCxt->realCol && QUERY_NODE_COLUMN_REF == nodeType(pNode)) { + return doCollect(pCxt, ((SColumnRefNode*)pNode)->slotId, pNode); + } + return DEAL_RES_CONTINUE; +} + +int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols) { + if (NULL == pSelect || NULL == pCols) { + return TSDB_CODE_SUCCESS; + } + + SCollectColumnsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .realCol = realCol, + .pCols = nodesMakeList(), + .pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK) + }; + if (NULL == cxt.pCols || NULL == cxt.pColIdHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + nodesWalkSelectStmt(pSelect, clause, collectColumns, &cxt); + taosHashCleanup(cxt.pColIdHash); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pCols); + return cxt.errCode; + } + *pCols = cxt.pCols; + return TSDB_CODE_SUCCESS; +} + +typedef struct SCollectFuncsCxt { + int32_t errCode; + FFuncClassifier classifier; + SNodeList* pFuncs; +} SCollectFuncsCxt; + +static EDealRes collectFuncs(SNode* pNode, void* pContext) { + SCollectFuncsCxt* pCxt = (SCollectFuncsCxt*)pContext; + if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId)) { + pCxt->errCode = nodesListAppend(pCxt->pFuncs, pNode); + return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); + } + return DEAL_RES_CONTINUE; +} + +int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs) { + if (NULL == pSelect || NULL == pFuncs) { + return TSDB_CODE_SUCCESS; + } + + SCollectFuncsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .classifier = classifier, + .pFuncs = nodesMakeList() + }; + if (NULL == cxt.pFuncs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + nodesWalkSelectStmt(pSelect, SQL_CLAUSE_GROUP_BY, collectFuncs, &cxt); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pFuncs); + return cxt.errCode; + } + *pFuncs = cxt.pFuncs; + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 1222138b5e..06154ca192 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -76,7 +76,7 @@ cmd ::= SHOW QUERIES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_QUERIES, 0, 0); cmd ::= SHOW CONNECTIONS.{ setShowOptions(pInfo, TSDB_MGMT_TABLE_CONNS, 0, 0);} cmd ::= SHOW STREAMS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_STREAMS, 0, 0); } cmd ::= SHOW VARIABLES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); } -cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); } +cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); } cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); } cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); } diff --git a/source/libs/parser/src/astCreateFuncs.c b/source/libs/parser/src/astCreateFuncs.c index 2615aca7f3..c129ebeef1 100644 --- a/source/libs/parser/src/astCreateFuncs.c +++ b/source/libs/parser/src/astCreateFuncs.c @@ -96,11 +96,17 @@ SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode) { SNodeList* list = nodesMakeList(); CHECK_OUT_OF_MEM(list); - return nodesListAppend(list, pNode); + if (TSDB_CODE_SUCCESS != nodesListAppend(list, pNode)) { + pCxt->valid = false; + } + return list; } SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode) { - return nodesListAppend(pList, pNode); + if (TSDB_CODE_SUCCESS != nodesListAppend(pList, pNode)) { + pCxt->valid = false; + } + return pList; } SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) { diff --git a/source/libs/parser/src/parserImpl.c b/source/libs/parser/src/parserImpl.c index a2602c42ee..ec53f266bf 100644 --- a/source/libs/parser/src/parserImpl.c +++ b/source/libs/parser/src/parserImpl.c @@ -241,17 +241,6 @@ abort_parse: return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; } -typedef enum ESqlClause { - SQL_CLAUSE_FROM = 1, - SQL_CLAUSE_WHERE, - SQL_CLAUSE_PARTITION_BY, - SQL_CLAUSE_WINDOW, - SQL_CLAUSE_GROUP_BY, - SQL_CLAUSE_HAVING, - SQL_CLAUSE_SELECT, - SQL_CLAUSE_ORDER_BY -} ESqlClause; - static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } @@ -298,7 +287,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Not SELECTed expression"; case TSDB_CODE_PAR_NOT_SINGLE_GROUP: return "Not a single-group group function"; - case TSDB_CODE_PAR_OUT_OF_MEMORY: + case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: return "Unknown error"; @@ -376,7 +365,7 @@ static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColS static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode* pCol) { pCol->pProjectRef = (SNode*)pExpr; - pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol); + nodesListAppend(pExpr->pAssociationList, (SNode*)pCol); if (NULL != pTable) { strcpy(pCol->tableAlias, pTable->tableAlias); } @@ -391,7 +380,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode for (int32_t i = 0; i < nums; ++i) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { - return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } setColumnInfoBySchema(pTable, pMeta->schema + i, pCol); nodesListAppend(pList, (SNode*)pCol); @@ -402,7 +391,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode FOREACH(pNode, pProjectList) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { - return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } setColumnInfoByExpr(pTable, (SExprNode*)pNode, pCol); nodesListAppend(pList, (SNode*)pCol); @@ -572,7 +561,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { int32_t n = strlen(pVal->literal); pVal->datum.p = calloc(1, n); if (NULL == pVal->datum.p) { - generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); return DEAL_RES_ERROR; } trimStringCopy(pVal->literal, n, pVal->datum.p); @@ -582,7 +571,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { int32_t n = strlen(pVal->literal); char* tmp = calloc(1, n); if (NULL == tmp) { - generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); return DEAL_RES_ERROR; } int32_t len = trimStringCopy(pVal->literal, n, tmp); @@ -830,7 +819,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool size_t nums = taosArrayGetSize(pTables); pSelect->pProjectionList = nodesMakeList(); if (NULL == pSelect->pProjectionList) { - return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } for (size_t i = 0; i < nums; ++i) { STableNode* pTable = taosArrayGetP(pTables, i); @@ -897,7 +886,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro } else { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { - return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), pCol); ((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol; @@ -1036,7 +1025,7 @@ int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) { pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList); pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema)); if (NULL == pQuery->pResSchema) { - return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY); + return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } SNode* pNode; int32_t index = 0; diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 664f2a3ff2..f6585e41ff 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -2262,7 +2262,7 @@ static void yy_reduce( { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); } break; case 13: /* cmd ::= SHOW SCORES */ -{ setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); } +{ setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); } break; case 14: /* cmd ::= SHOW GRANTS */ { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); } diff --git a/source/libs/planner/inc/plannerImpl.h b/source/libs/planner/inc/plannerImpl.h index 1f8df990b1..dbfbeb7efc 100644 --- a/source/libs/planner/inc/plannerImpl.h +++ b/source/libs/planner/inc/plannerImpl.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SLogicNode { ENodeType type; + int32_t id; SNodeList* pTargets; SNode* pConditions; SNodeList* pChildren; diff --git a/source/libs/planner/src/plannerImpl.c b/source/libs/planner/src/plannerImpl.c index ddb3023bb3..9b82b4d58e 100644 --- a/source/libs/planner/src/plannerImpl.c +++ b/source/libs/planner/src/plannerImpl.c @@ -16,71 +16,32 @@ #include "plannerImpl.h" #include "functionMgt.h" -static SLogicNode* createQueryLogicNode(SNode* pStmt); +#define CHECK_ALLOC(p, res) \ + do { \ + if (NULL == p) { \ + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \ + return res; \ + } \ + } while (0) -typedef struct SCollectColumnsCxt { - SNodeList* pCols; - SHashObj* pColIdHash; -} SCollectColumnsCxt; +#define CHECK_CODE(exec, res) \ + do { \ + int32_t code = exec; \ + if (TSDB_CODE_SUCCESS != code) { \ + pCxt->errCode = code; \ + return res; \ + } \ + } while (0) -static EDealRes doCollectColumns(SNode* pNode, void* pContext) { - if (QUERY_NODE_COLUMN == nodeType(pNode)) { - SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext; - int16_t colId = ((SColumnNode*)pNode)->colId; - if (colId > 0) { - if (NULL == taosHashGet(pCxt->pColIdHash, &colId, sizeof(colId))) { - taosHashPut(pCxt->pColIdHash, &colId, sizeof(colId), NULL, 0); - nodesListAppend(pCxt->pCols, pNode); - } - } - } - return DEAL_RES_CONTINUE; -} - -static SNodeList* collectColumns(SSelectStmt* pSelect) { - SCollectColumnsCxt cxt = { .pCols = nodesMakeList(), .pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK) }; - if (NULL == cxt.pCols || NULL == cxt.pColIdHash) { - return NULL; - } - nodesWalkNode(pSelect->pFromTable, doCollectColumns, &cxt); - nodesWalkNode(pSelect->pWhere, doCollectColumns, &cxt); - nodesWalkList(pSelect->pPartitionByList, doCollectColumns, &cxt); - nodesWalkNode(pSelect->pWindow, doCollectColumns, &cxt); - nodesWalkList(pSelect->pGroupByList, doCollectColumns, &cxt); - nodesWalkNode(pSelect->pHaving, doCollectColumns, &cxt); - nodesWalkList(pSelect->pProjectionList, doCollectColumns, &cxt); - nodesWalkList(pSelect->pOrderByList, doCollectColumns, &cxt); - taosHashCleanup(cxt.pColIdHash); - return cxt.pCols; -} - -typedef struct SCollectAggFuncsCxt { - SNodeList* pAggFuncs; -} SCollectAggFuncsCxt; - -static EDealRes doCollectAggFuncs(SNode* pNode, void* pContext) { - if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) { - SCollectAggFuncsCxt* pCxt = (SCollectAggFuncsCxt*)pContext; - nodesListAppend(pCxt->pAggFuncs, pNode); - return DEAL_RES_IGNORE_CHILD; - } - return DEAL_RES_CONTINUE; -} - -static SNodeList* collectAggFuncs(SSelectStmt* pSelect) { - SCollectAggFuncsCxt cxt = { .pAggFuncs = nodesMakeList() }; - if (NULL == cxt.pAggFuncs) { - return NULL; - } - nodesWalkNode(pSelect->pHaving, doCollectAggFuncs, &cxt); - nodesWalkList(pSelect->pProjectionList, doCollectAggFuncs, &cxt); - if (!pSelect->isDistinct) { - nodesWalkList(pSelect->pOrderByList, doCollectAggFuncs, &cxt); - } - return cxt.pAggFuncs; -} +typedef struct SPlanContext { + int32_t errCode; + int32_t planNodeId; + SNodeList* pResource; +} SPlanContext; typedef struct SRewriteExprCxt { + int32_t errCode; + int32_t planNodeId; SNodeList* pTargets; } SRewriteExprCxt; @@ -90,9 +51,15 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { int32_t index = 0; FOREACH(pTarget, pCxt->pTargets) { if (nodesEqualNode(pTarget, *pNode)) { + SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF); + if (NULL == pCol) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + pCol->tupleId = pCxt->planNodeId; + pCol->slotId = index; nodesDestroyNode(*pNode); - *pNode = nodesMakeNode(QUERY_NODE_COLUMN_REF); - ((SColumnRef*)*pNode)->slotId = index; + *pNode = (SNode*)pCol; return DEAL_RES_IGNORE_CHILD; } ++index; @@ -100,59 +67,93 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t rewriteExpr(SNodeList* pTargets, SSelectStmt* pSelect) { - SRewriteExprCxt cxt = { .pTargets = pTargets }; - nodesRewriteNode(&(pSelect->pFromTable), doRewriteExpr, &cxt); - nodesRewriteNode(&(pSelect->pWhere), doRewriteExpr, &cxt); - nodesRewriteList(pSelect->pPartitionByList, doRewriteExpr, &cxt); - nodesRewriteNode(&(pSelect->pWindow), doRewriteExpr, &cxt); - nodesRewriteList(pSelect->pGroupByList, doRewriteExpr, &cxt); - nodesRewriteNode(&(pSelect->pHaving), doRewriteExpr, &cxt); - nodesRewriteList(pSelect->pProjectionList, doRewriteExpr, &cxt); - nodesRewriteList(pSelect->pOrderByList, doRewriteExpr, &cxt); - return TSDB_CODE_SUCCESS; +static int32_t rewriteExpr(int32_t planNodeId, SNodeList* pTargets, SSelectStmt* pSelect, ESqlClause clause) { + SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = planNodeId, .pTargets = pTargets }; + nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); + return cxt.errCode; } -static SLogicNode* pushLogicNode(SLogicNode* pRoot, SLogicNode* pNode) { +static SLogicNode* pushLogicNode(SPlanContext* pCxt, SLogicNode* pRoot, SLogicNode* pNode) { + if (TSDB_CODE_SUCCESS != pCxt->errCode) { + goto error; + } + if (NULL == pRoot) { return pNode; } + if (NULL == pNode) { return pRoot; } - pRoot->pParent = pNode; + if (NULL == pNode->pChildren) { pNode->pChildren = nodesMakeList(); + if (NULL == pNode->pChildren) { + goto error; + } } - nodesListAppend(pNode->pChildren, (SNode*)pRoot); + if (TSDB_CODE_SUCCESS != nodesListAppend(pNode->pChildren, (SNode*)pRoot)) { + goto error; + } + pRoot->pParent = pNode; return pNode; +error: + nodesDestroyNode((SNode*)pNode); + return pRoot; } -static SNodeList* createScanTargets(SNodeList* pCols) { - +static SNodeList* createScanTargets(int32_t planNodeId, int32_t numOfScanCols) { + SNodeList* pTargets = nodesMakeList(); + if (NULL == pTargets) { + return NULL; + } + for (int32_t i = 0; i < numOfScanCols; ++i) { + SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF); + if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pTargets, (SNode*)pCol)) { + nodesDestroyList(pTargets); + return NULL; + } + pCol->tupleId = planNodeId; + pCol->slotId = i; + } + return pTargets; } -static SLogicNode* createScanLogicNode(SSelectStmt* pSelect, SRealTableNode* pRealTable) { +static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) { SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); - SNodeList* pCols = collectColumns(pSelect); - pScan->pScanCols = nodesCloneList(pCols); - // - rewriteExpr(pScan->pScanCols, pSelect); - pScan->node.pTargets = createScanTargets(pCols); + CHECK_ALLOC(pScan, NULL); + pScan->node.id = pCxt->planNodeId++; + pScan->pMeta = pRealTable->pMeta; + + // set columns to scan + SNodeList* pCols = NULL; + CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pScan->pMeta->uid, true, &pCols), (SLogicNode*)pScan); + pScan->pScanCols = nodesCloneList(pCols); + CHECK_ALLOC(pScan->pScanCols, (SLogicNode*)pScan); + + // pScanCols of SScanLogicNode is equivalent to pTargets of other logic nodes + CHECK_CODE(rewriteExpr(pScan->node.id, pScan->pScanCols, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pScan); + + // set output + pScan->node.pTargets = createScanTargets(pScan->node.id, LIST_LENGTH(pScan->pScanCols)); + CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan); + return (SLogicNode*)pScan; } -static SLogicNode* createSubqueryLogicNode(SSelectStmt* pSelect, STempTableNode* pTable) { - return createQueryLogicNode(pTable->pSubquery); +static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt); + +static SLogicNode* createSubqueryLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) { + return createQueryLogicNode(pCxt, pTable->pSubquery); } -static SLogicNode* createLogicNodeByTable(SSelectStmt* pSelect, SNode* pTable) { +static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable) { switch (nodeType(pTable)) { case QUERY_NODE_REAL_TABLE: - return createScanLogicNode(pSelect, (SRealTableNode*)pTable); + return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable); case QUERY_NODE_TEMP_TABLE: - return createSubqueryLogicNode(pSelect, (STempTableNode*)pTable); + return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable); case QUERY_NODE_JOIN_TABLE: default: break; @@ -160,45 +161,86 @@ static SLogicNode* createLogicNodeByTable(SSelectStmt* pSelect, SNode* pTable) { return NULL; } -static SLogicNode* createFilterLogicNode(SNode* pWhere) { +static SLogicNode* createFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pWhere) { if (NULL == pWhere) { return NULL; } + SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER); + CHECK_ALLOC(pFilter, NULL); + pFilter->node.id = pCxt->planNodeId++; + + // set filter conditions pFilter->node.pConditions = nodesCloneNode(pWhere); + CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter); + + // set the output and rewrite the expression in subsequent clauses with the output + SNodeList* pCols = NULL; + CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, 0, false, &pCols), (SLogicNode*)pFilter); + pFilter->node.pTargets = nodesCloneList(pCols); + CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter); + CHECK_CODE(rewriteExpr(pFilter->node.id, pFilter->node.pTargets, pSelect, SQL_CLAUSE_WHERE), (SLogicNode*)pFilter); + return (SLogicNode*)pFilter; } -static SLogicNode* createAggLogicNode(SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) { - SNodeList* pAggFuncs = collectAggFuncs(pSelect); +static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) { + SNodeList* pAggFuncs = NULL; + CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL); if (NULL == pAggFuncs && NULL == pGroupByList) { return NULL; } + SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); + CHECK_ALLOC(pAgg, NULL); + pAgg->node.id = pCxt->planNodeId++; + + // set grouyp keys, agg funcs and having conditions pAgg->pGroupKeys = nodesCloneList(pGroupByList); + CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg); pAgg->pAggFuncs = nodesCloneList(pAggFuncs); + CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg); pAgg->node.pConditions = nodesCloneNode(pHaving); + CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg); + + // set the output and rewrite the expression in subsequent clauses with the output + SNodeList* pCols = NULL; + CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_HAVING, 0, false, &pCols), (SLogicNode*)pAgg); + pAgg->node.pTargets = nodesCloneList(pCols); + CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg); + CHECK_CODE(rewriteExpr(pAgg->node.id, pAgg->node.pTargets, pSelect, SQL_CLAUSE_HAVING), (SLogicNode*)pAgg); + return (SLogicNode*)pAgg; } -static SLogicNode* createSelectLogicNode(SSelectStmt* pSelect) { - SLogicNode* pRoot = createLogicNodeByTable(pSelect, pSelect->pFromTable); - pRoot = pushLogicNode(pRoot, createFilterLogicNode(pSelect->pWhere)); - pRoot = pushLogicNode(pRoot, createAggLogicNode(pSelect, pSelect->pGroupByList, pSelect->pHaving)); - // pRoot = pushLogicNode(pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList)); +static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) { + SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable); + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pRoot = pushLogicNode(pCxt, pRoot, createFilterLogicNode(pCxt, pSelect, pSelect->pWhere)); + } + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect, pSelect->pGroupByList, pSelect->pHaving)); + } + // pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList)); return pRoot; } -static SLogicNode* createQueryLogicNode(SNode* pStmt) { +static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt) { switch (nodeType(pStmt)) { case QUERY_NODE_SELECT_STMT: - return createSelectLogicNode((SSelectStmt*)pStmt); + return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt); default: break; } } int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) { - *pLogicNode = createQueryLogicNode(pNode); + SPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = 0 }; + SLogicNode* pRoot = createQueryLogicNode(&cxt, pNode); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyNode((SNode*)pRoot); + return cxt.errCode; + } + *pLogicNode = pRoot; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 9f678b4528..4cd040c238 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp .pCont = pMsg, .contLen = pInfo->msgInfo.len, .ahandle = (void*) pInfo, - .handle = NULL, + .handle = pInfo->msgInfo.handle, .code = 0 }; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 94c833307f..42270cd645 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -95,6 +95,7 @@ typedef struct SSchTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* + void* handle; // task send handle } SSchTask; typedef struct SSchJobAttr { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c14ae40b59..45e125608a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -18,6 +18,10 @@ #include "query.h" #include "catalog.h" +typedef struct SSchTrans { + void *transInst; + void *transHandle; +}SSchTrans; static SSchedulerMgmt schMgmt = {0}; uint64_t schGenTaskId(void) { @@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); + pTask->handle = pMsg->handle; SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: @@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; + + SSchTrans *trans = (SSchTrans *)transport; + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); @@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t param->queryId = qId; param->taskId = tId; + pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; + pMsgSendInfo->msgInfo.handle = trans->transHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; int64_t transporterId = 0; - code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo); if (code) { SCH_ERR_JRET(code); } @@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, atomic_store_32(&pTask->lastMsgType, msgType); - SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); + SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle}; + SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9cab863ed7..f1bd1ba980 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) { + if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP || + rpcMsg.msgType == TDMT_VND_RES_READY) { rpcMsg.handle = conn; conn->persist = 1; + tDebug("client conn %p persist by app", conn); } tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port)); + conn->secured = pHead->secured; if (conn->push != NULL && conn->ctnRdCnt != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); conn->push = NULL; @@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) { } } conn->ctnRdCnt += 1; - conn->secured = pHead->secured; // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); @@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; // user owns conn->persist = 1 - if (conn->push == NULL || conn->persist == 0) { + if (conn->push == NULL && conn->persist == 0) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - - destroyCmsg(conn->data); - conn->data = NULL; } - + destroyCmsg(conn->data); + conn->data = NULL; // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { - uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { @@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later + if (handle->data == NULL) { + return; + } SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) {