From e000addc2c63b89d95c0c0816148a2080c9510e8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 13:34:26 +0800 Subject: [PATCH 1/7] fix(query): fix cast function regression brought by other changes --- source/libs/parser/src/parAstCreater.c | 6 ++---- source/libs/scalar/src/sclfunc.c | 4 ++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e8ac562072..04ef854b5c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,10 +363,8 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_VARCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes + VARSTR_HEADER_SIZE; - } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; + if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5fa8eb0c9a..5127ae534a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,6 +709,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); + if (IS_VAR_DATA_TYPE(outputType)) { + outputLen += VARSTR_HEADER_SIZE; + } + char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; From 84d611177de0038ad547f542eacf1683e3b2b40b Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 9 May 2022 14:18:27 +0800 Subject: [PATCH 2/7] add snode udfcOpen/udfcClose --- source/dnode/mgmt/mgmt_snode/src/smInt.c | 8 ++++++++ source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 3757dcd72a..26300a9fe3 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "smInt.h" +#include "libs/function/function.h" static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } @@ -29,6 +30,9 @@ static void smClose(SMgmtWrapper *pWrapper) { if (pMgmt == NULL) return; dInfo("snode-mgmt start to cleanup"); + + udfcClose(); + if (pMgmt->pSnode != NULL) { smStopWorker(pMgmt); sndClose(pMgmt->pSnode); @@ -68,6 +72,10 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { } dmReportStartup(pWrapper->pDnode, "snode-worker", "initialized"); + if (udfcOpen() != 0) { + dError("failed to open udfc in snode"); + } + return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 17c7563885..bd282c5707 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -337,7 +337,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { dmReportStartup(pDnode, "vnode-vnodes", "initialized"); if (udfcOpen() != 0) { - dError("failed to open udfc in dnode"); + dError("failed to open udfc in vnode"); } code = 0; From b4c7118a51b296a65e5fc1d2c6fd10671de00fef Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 15:13:43 +0800 Subject: [PATCH 3/7] fix(query): fix cast ncahr bug --- source/libs/scalar/src/sclfunc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5fa8eb0c9a..0161323e37 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -822,7 +822,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp varDataSetLen(output, len); } //for constant conversion, need to set proper length of pOutput description - if (len < outputLen - VARSTR_HEADER_SIZE) { + if (len < outputLen) { pOutput->columnData->info.bytes = len; } break; From b3df34ef08e33cf7daf12879e2c48b5d3e5eec3b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 15:16:47 +0800 Subject: [PATCH 4/7] Revert "fix(query): fix cast function regression brought by other changes" This reverts commit e000addc2c63b89d95c0c0816148a2080c9510e8. --- source/libs/parser/src/parAstCreater.c | 6 ++++-- source/libs/scalar/src/sclfunc.c | 4 ---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 04ef854b5c..e8ac562072 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,8 +363,10 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; + if (TSDB_DATA_TYPE_VARCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes + VARSTR_HEADER_SIZE; + } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a727d80170..0161323e37 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,10 +709,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); - if (IS_VAR_DATA_TYPE(outputType)) { - outputLen += VARSTR_HEADER_SIZE; - } - char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; From 39bf90c0daa23ff6aba14dc71330c981ff48aaa9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 9 May 2022 15:17:33 +0800 Subject: [PATCH 5/7] fix(sync): fix memory leak --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 6 ++++++ source/libs/sync/src/syncCommit.c | 2 ++ 2 files changed, 8 insertions(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 11afe33335..2baa8b8942 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -198,6 +198,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.refId = pMsg->rpcMsg.refId; tmsgSendRsp(&rsp); } + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } @@ -211,6 +214,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 8b54b0a090..5febb9a14c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -72,6 +72,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { "pSyncNode->pRaftStore->currentTerm:%lu", pEntry->term, pSyncNode->pRaftStore->currentTerm); } + + syncEntryDestory(pEntry); } } From 205c92e472475da5cbd0fc41299eb849ab1a4063 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 May 2022 15:41:32 +0800 Subject: [PATCH 6/7] fix(rpc): fix duplicat port error --- source/os/src/osSocket.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 9e29f44ed3..2410586287 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -746,7 +746,8 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) { return false; } taosCloseSocket(&pSocket); - return 0 == taosValidIp(ip) ? true : false; + return true; + // return 0 == taosValidIp(ip) ? true : false; } TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; From 661fbd2e46db647e5d7eb9ddd68daa2af09abc82 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 9 May 2022 16:35:59 +0800 Subject: [PATCH 7/7] enh(stream): improve topic show --- example/src/tstream.c | 2 +- include/client/taos.h | 13 +-- include/util/tdef.h | 14 ++- source/client/src/tmq.c | 2 +- source/common/src/systable.c | 4 +- source/dnode/mnode/impl/inc/mndTopic.h | 2 + source/dnode/mnode/impl/src/mndConsumer.c | 112 ++++++++++++---------- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndTopic.c | 5 + source/libs/wal/src/walRead.c | 33 +++---- tests/script/tsim/tstream/basic0.sim | 2 +- 11 files changed, 102 insertions(+), 89 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 65fd005954..537bfebede 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -83,7 +83,7 @@ int32_t create_stream() { /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( pConn, - "create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " + "create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " "from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); diff --git a/include/client/taos.h b/include/client/taos.h index 6e20900668..26d4d18234 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -221,15 +221,12 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); -#if 0 -DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); -#endif - DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ + DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); @@ -240,6 +237,7 @@ DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t * DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); #endif + /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { @@ -268,12 +266,9 @@ DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); #endif -/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ -#if 0 -DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); -DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); -#endif + /* ------------------------------ TMQ END -------------------------------- */ + #if 1 // Shuduo: temporary enable for app build typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index 4669a29883..022fd8ba8e 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -86,10 +86,9 @@ extern const int32_t TYPE_BYTES[15]; #define TS_PATH_DELIMITER "." #define TS_ESCAPE_CHAR '`' - -#define TSDB_TIME_PRECISION_MILLI 0 -#define TSDB_TIME_PRECISION_MICRO 1 -#define TSDB_TIME_PRECISION_NANO 2 +#define TSDB_TIME_PRECISION_MILLI 0 +#define TSDB_TIME_PRECISION_MICRO 1 +#define TSDB_TIME_PRECISION_NANO 2 #define TSDB_TIME_PRECISION_HOURS 3 #define TSDB_TIME_PRECISION_MINUTES 4 #define TSDB_TIME_PRECISION_SECONDS 5 @@ -249,7 +248,6 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 -#define TSDB_SHOW_LIST_LEN 1000 #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 @@ -376,9 +374,9 @@ typedef enum ELogicConditionType { * 1. ordinary sub query for select * from super_table * 2. all sqlobj generated by createSubqueryObj with this flag */ -#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type -#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file -#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type +#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type +#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file +#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9280756a8a..d674b8286b 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -395,7 +395,7 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); - tmq_list_append(*topics, topic->topicName); + tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); } return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 51f924280a..4b88b5b384 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -14,9 +14,9 @@ */ #include "systable.h" +#include "taos.h" #include "tdef.h" #include "types.h" -#include "taos.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -264,7 +264,7 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index e3174a90a2..be3f9c3283 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -33,6 +33,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 23a87b4691..6c77c379e0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -790,71 +790,83 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; - SColumnInfoData *pColInfo; - int32_t cols = 0; - taosRLockLatch(&pConsumer->lock); - // consumer id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); + int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); + bool hasTopic = true; + if (topicSz == 0) { + hasTopic = false; + topicSz = 1; + } - // group id - char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(groupId, strlen(varDataVal(groupId))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); + for (int32_t i = 0; i < topicSz; i++) { + if (numOfRows + topicSz > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + topicSz); + } + SColumnInfoData *pColInfo; + int32_t cols = 0; - // app id - char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); - varDataSetLen(appId, strlen(varDataVal(appId))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)appId, false); + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); - // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); - varDataSetLen(status, strlen(varDataVal(status))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)status, false); + // group id + char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); + varDataSetLen(groupId, strlen(varDataVal(groupId))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); - // subscribed topics - // TODO: split into multiple rows - char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0}; - char *showStr = taosShowStrArray(pConsumer->assignedTopics); - tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN); - taosMemoryFree(showStr); - varDataSetLen(topics, strlen(varDataVal(topics))); + // app id + char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); + varDataSetLen(appId, strlen(varDataVal(appId))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)appId, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)topics, false); + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); + varDataSetLen(status, strlen(varDataVal(status))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)status, false); - // pid - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true); + // one subscribed topic + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (hasTopic) { + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i)); + tstrncpy(varDataVal(topic), topicName, TSDB_TOPIC_FNAME_LEN); + varDataSetLen(topic, strlen(varDataVal(topic))); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + } else { + colDataAppend(pColInfo, numOfRows, NULL, true); + } - // end point - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); + // pid + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true); - // up time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); + // end point + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); - // subscribe time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false); + // up time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); - // rebalance time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false); + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); + + numOfRows++; + } taosRUnLockLatch(&pConsumer->lock); sdbRelease(pSdb, pConsumer); - - numOfRows++; } pShow->numOfRows += numOfRows; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8c1557b73d..599f0d5fef 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -259,7 +259,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } -#if 1 +#if 0 printf("|"); for (int i = 0; i < pStream->outputSchema.nCols; i++) { printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 0a8d1cee4a..63d429df9e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -61,6 +61,11 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} +const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { + // + return strchr(topic, '.') + 1; +} + SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4fe07029f1..e14515286e 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -55,15 +55,15 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { } static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { - int code = 0; + int ret = 0; TdFilePtr pIdxTFile = pRead->pReadIdxTFile; TdFilePtr pLogTFile = pRead->pReadLogTFile; // seek position int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); - code = taosLSeekFile(pIdxTFile, offset, SEEK_SET); - if (code < 0) { + ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); + if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -72,14 +72,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - // TODO:deserialize + ASSERT(entry.ver == ver); - code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); - if (code < 0) { + ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); + if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - return code; + return ret; } static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { @@ -108,7 +108,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { } static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { - int code; SWal *pWal = pRead->pWal; if (ver == pRead->curVersion) { return 0; @@ -126,16 +125,15 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); if (pRead->curFileFirstVer != pRet->firstVer) { - code = walReadChangeFile(pRead, pRet->firstVer); - if (code < 0) { + if (walReadChangeFile(pRead, pRet->firstVer) < 0) { return -1; } } - code = walReadSeekFilePos(pRead, pRet->firstVer, ver); - if (code < 0) { + if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { return -1; } + pRead->curVersion = ver; return 0; @@ -246,8 +244,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int code; // TODO: check wal life if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); - if (code < 0) { + if (walReadSeekVer(pRead, ver) < 0) { return -1; } } @@ -278,8 +275,12 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { pRead->capacity = pRead->pHead->head.bodyLen; } - if (pRead->pHead->head.bodyLen != - taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) { + if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != + pRead->pHead->head.bodyLen) { + if (code < 0) + terrno = TAOS_SYSTEM_ERROR(errno); + else + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } diff --git a/tests/script/tsim/tstream/basic0.sim b/tests/script/tsim/tstream/basic0.sim index 2a1bd14531..9edad991dc 100644 --- a/tests/script/tsim/tstream/basic0.sim +++ b/tests/script/tsim/tstream/basic0.sim @@ -33,7 +33,7 @@ if $rows != 3 then return -1 endi -sql create stream s1 into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m) +sql create stream s1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_alias from ct1 interval(10m) sql show stables if $rows != 2 then