diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index f00f007e5d..1431410247 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -46,6 +46,10 @@ extern "C" { #define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE) +#define COMPACT_DB_RESULT_COLS 3 +#define COMPACT_DB_RESULT_FIELD1_LEN 32 +#define COMPACT_DB_RESULT_FIELD3_LEN 128 + #define SHOW_ALIVE_RESULT_COLS 1 #define BIT_FLAG_MASK(n) (1 << n) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 693efbc364..9ccf1cb80d 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -16,6 +16,7 @@ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" +#include "cmdnodes.h" #include "os.h" #include "query.h" #include "systable.h" @@ -541,6 +542,118 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { return code; } +static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) { + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->info.hasVarCol = true; + + pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData)); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN; + taosArrayPush(pBlock->pDataBlock, &infoData); + + infoData.info.type = TSDB_DATA_TYPE_BIGINT; + infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + taosArrayPush(pBlock->pDataBlock, &infoData); + + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN; + taosArrayPush(pBlock->pDataBlock, &infoData); + + blockDataEnsureCapacity(pBlock, 1); + + SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1); + SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2); + char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0}; + char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0}; + if (pRsp->bAccepted) { + STR_TO_VARSTR(result, "accepted"); + colDataSetVal(pResultCol, 0, result, false); + colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false); + STR_TO_VARSTR(reason, "success"); + colDataSetVal(pReasonCol, 0, reason, false); + } else { + STR_TO_VARSTR(result, "rejected"); + colDataSetVal(pResultCol, 0, result, false); + colDataSetNULL(pIdCol, 0); + STR_TO_VARSTR(reason, "compaction is ongoing"); + colDataSetVal(pReasonCol, 0, reason, false); + } + pBlock->info.rows = 1; + + *block = pBlock; + + return TSDB_CODE_SUCCESS; +} + +static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) { + SSDataBlock* pBlock = NULL; + int32_t code = buildCompactDbBlock(pCompactDb, &pBlock); + if (code) { + return code; + } + + size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + *pRsp = taosMemoryCalloc(1, rspSize); + if (NULL == *pRsp) { + blockDataDestroy(pBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pRsp)->useconds = 0; + (*pRsp)->completed = 1; + (*pRsp)->precision = 0; + (*pRsp)->compressed = 0; + (*pRsp)->compLen = 0; + (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); + (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); + + int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); + blockDataDestroy(pBlock); + + if (len != rspSize - sizeof(SRetrieveTableRsp)) { + uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len, + (uint64_t)(rspSize - sizeof(SRetrieveTableRsp))); + return TSDB_CODE_TSC_INVALID_INPUT; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + } else { + SCompactDbRsp rsp = {0}; + SRetrieveTableRsp* pRes = NULL; + code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp); + if (TSDB_CODE_SUCCESS == code) { + code = buildRetriveTableRspForCompactDb(&rsp, &pRes); + } + if (TSDB_CODE_SUCCESS == code) { + code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true); + } + + if (code != 0) { + taosMemoryFree(pRes); + } + } + + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } + return code; +} + __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { switch (msgType) { case TDMT_MND_CONNECT: @@ -557,6 +670,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { return processAlterStbRsp; case TDMT_MND_SHOW_VARIABLES: return processShowVariablesRsp; + case TDMT_MND_COMPACT_DB: + return processCompactDbRsp; default: return genericRspCallback; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5260e4a2bb..126974bb40 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8732,6 +8732,28 @@ static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pS return TSDB_CODE_SUCCESS; } +static int32_t extractCompactDbResultSchema(int32_t* numOfCols, SSchema** pSchema) { + *numOfCols = COMPACT_DB_RESULT_COLS; + *pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema)); + if (NULL == (*pSchema)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; + (*pSchema)[0].bytes = COMPACT_DB_RESULT_FIELD1_LEN; + strcpy((*pSchema)[0].name, "name"); + + (*pSchema)[1].type = TSDB_DATA_TYPE_BIGINT; + (*pSchema)[1].bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + strcpy((*pSchema)[1].name, "id"); + + (*pSchema)[2].type = TSDB_DATA_TYPE_BINARY; + (*pSchema)[2].bytes = COMPACT_DB_RESULT_FIELD3_LEN; + strcpy((*pSchema)[2].name, "scope"); + + return TSDB_CODE_SUCCESS; +} + int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { if (NULL == pRoot) { return TSDB_CODE_SUCCESS; @@ -8758,6 +8780,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT: case QUERY_NODE_SHOW_VARIABLES_STMT: return extractShowVariablesResultSchema(numOfCols, pSchema); + case QUERY_NODE_COMPACT_DATABASE_STMT: + return extractCompactDbResultSchema(numOfCols, pSchema); default: break; } @@ -10406,6 +10430,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { pQuery->execMode = QUERY_EXEC_MODE_LOCAL; break; case QUERY_NODE_SHOW_VARIABLES_STMT: + case QUERY_NODE_COMPACT_DATABASE_STMT: pQuery->haveResultSet = true; pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) {