From 44a867b751cfbf75b1e6883f1977a50826833652 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 4 Jan 2023 14:42:03 +0800 Subject: [PATCH 1/3] fix: 'show user privilegs' command echo user name incomplete --- source/dnode/mnode/impl/src/mndUser.c | 8 ++++---- source/libs/parser/src/parTranslater.c | 18 +++++++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index efce6255fb..85a92c7aef 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -884,9 +884,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock db = taosHashIterate(pUser->writeDbs, NULL); while (db != NULL) { cols = 0; - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)userName, false); char privilege[20] = {0}; @@ -909,9 +909,9 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char *topic = taosHashIterate(pUser->topics, NULL); while (topic != NULL) { cols = 0; - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)userName, false); char privilege[20] = {0}; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c1982f3c7e..991d3bc553 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5527,7 +5527,8 @@ static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* a return; } -static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj* pUserAliasSet) { +static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect, + SHashObj* pUserAliasSet) { SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0); if (NULL == pSelect->pWindow || (QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) { @@ -5539,7 +5540,10 @@ static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj } strcpy(pFunc->functionName, "_wstart"); getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName)); - int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc); + int32_t code = getFuncInfo(pCxt, pFunc); + if (TSDB_CODE_SUCCESS != code) { + code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc); + } if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode((SNode*)pFunc); } @@ -5551,7 +5555,7 @@ static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pS SHashObj* pUserAliasSet = NULL; int32_t code = checkProjectAlias(pCxt, pSelect->pProjectionList, &pUserAliasSet); if (TSDB_CODE_SUCCESS == code) { - code = addWstartTsToCreateStreamQueryImpl(pSelect, pUserAliasSet); + code = addWstartTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet); } taosHashCleanup(pUserAliasSet); return code; @@ -5664,13 +5668,13 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { pCxt->createStream = true; - int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery); - if (TSDB_CODE_SUCCESS == code) { - code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt); - } + int32_t code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { code = translateQuery(pCxt, pStmt->pQuery); } + if (TSDB_CODE_SUCCESS == code) { + code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery); + } if (TSDB_CODE_SUCCESS == code) { code = checkStreamQuery(pCxt, pStmt); } From 90c1ed107cbdaab94733597e3d3f890984ba757c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 4 Jan 2023 17:09:59 +0800 Subject: [PATCH 2/3] fix: duplicate column check in 'create stream' command --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 991d3bc553..05d49bb027 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5541,7 +5541,7 @@ static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSele strcpy(pFunc->functionName, "_wstart"); getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName)); int32_t code = getFuncInfo(pCxt, pFunc); - if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS == code) { code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc); } if (TSDB_CODE_SUCCESS != code) { From 442902e5cbccf72a3c4dc5ba345a0422a143320b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 4 Jan 2023 19:16:35 +0800 Subject: [PATCH 3/3] fix: send rpc response on closing sync or failing to enqueue --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 11 ++++++++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 1 - source/libs/sync/src/syncMain.c | 9 ++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 642ca1ebc1..cd29b11550 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -134,6 +134,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } +static void vmSendResponse(SRpcMsg *pMsg) { + if (pMsg->info.handle) { + SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; + rpcSendResponse(&rsp); + } +} + static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; if (pMsg->contLen < sizeof(SMsgHead)) { @@ -152,7 +159,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp if (pVnode == NULL) { dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); - return terrno != 0 ? terrno : -1; + terrno = (terrno != 0) ? terrno : -1; + vmSendResponse(pMsg); + return terrno; } switch (qtype) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8d53579483..ac6406fc23 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -182,7 +182,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, pVnode->state.applied); terrno = TSDB_CODE_VND_DUP_REQUEST; - pRsp->info.handle = NULL; return -1; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bf220178a4..32ae9e391c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1252,6 +1252,9 @@ void syncNodePreClose(SSyncNode* pSyncNode) { // stop heartbeat timer syncNodeStopHeartbeatTimer(pSyncNode); + + // clean rsp + syncRespCleanRsp(pSyncNode->pSyncRespMgr); } void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } @@ -2667,16 +2670,12 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn } int32_t code = syncNodeAppend(ths, pEntry); - if (code < 0) { - sNError(ths, "failed to append blocking msg"); - } return code; } else { syncEntryDestroy(pEntry); pEntry = NULL; + return -1; } - - return -1; } int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {