fix: tranform SCompactRsp to a datablock

This commit is contained in:
slzhou 2023-11-16 14:40:54 +08:00
parent 9ad86d7753
commit 88819ea608
3 changed files with 144 additions and 0 deletions

View File

@ -46,6 +46,10 @@ extern "C" {
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
#define COMPACT_DB_RESULT_COLS 3
#define COMPACT_DB_RESULT_FIELD1_LEN 32
#define COMPACT_DB_RESULT_FIELD3_LEN 128
#define SHOW_ALIVE_RESULT_COLS 1 #define SHOW_ALIVE_RESULT_COLS 1
#define BIT_FLAG_MASK(n) (1 << n) #define BIT_FLAG_MASK(n) (1 << n)

View File

@ -16,6 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "cmdnodes.h"
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "systable.h" #include "systable.h"
@ -541,6 +542,118 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = true;
pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
blockDataEnsureCapacity(pBlock, 1);
SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
if (pRsp->bAccepted) {
STR_TO_VARSTR(result, "accepted");
colDataSetVal(pResultCol, 0, result, false);
colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
STR_TO_VARSTR(reason, "success");
colDataSetVal(pReasonCol, 0, reason, false);
} else {
STR_TO_VARSTR(result, "rejected");
colDataSetVal(pResultCol, 0, result, false);
colDataSetNULL(pIdCol, 0);
STR_TO_VARSTR(reason, "compaction is ongoing");
colDataSetVal(pReasonCol, 0, reason, false);
}
pBlock->info.rows = 1;
*block = pBlock;
return TSDB_CODE_SUCCESS;
}
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL;
int32_t code = buildCompactDbBlock(pCompactDb, &pBlock);
if (code) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
blockDataDestroy(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRsp)->useconds = 0;
(*pRsp)->completed = 1;
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS);
blockDataDestroy(pBlock);
if (len != rspSize - sizeof(SRetrieveTableRsp)) {
uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
(uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
return TSDB_CODE_TSC_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
SCompactDbRsp rsp = {0};
SRetrieveTableRsp* pRes = NULL;
code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
if (TSDB_CODE_SUCCESS == code) {
code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
if (code != 0) {
taosMemoryFree(pRes);
}
}
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);
}
return code;
}
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) { switch (msgType) {
case TDMT_MND_CONNECT: case TDMT_MND_CONNECT:
@ -557,6 +670,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
return processAlterStbRsp; return processAlterStbRsp;
case TDMT_MND_SHOW_VARIABLES: case TDMT_MND_SHOW_VARIABLES:
return processShowVariablesRsp; return processShowVariablesRsp;
case TDMT_MND_COMPACT_DB:
return processCompactDbRsp;
default: default:
return genericRspCallback; return genericRspCallback;
} }

View File

@ -8732,6 +8732,28 @@ static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t extractCompactDbResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = COMPACT_DB_RESULT_COLS;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = COMPACT_DB_RESULT_FIELD1_LEN;
strcpy((*pSchema)[0].name, "name");
(*pSchema)[1].type = TSDB_DATA_TYPE_BIGINT;
(*pSchema)[1].bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
strcpy((*pSchema)[1].name, "id");
(*pSchema)[2].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[2].bytes = COMPACT_DB_RESULT_FIELD3_LEN;
strcpy((*pSchema)[2].name, "scope");
return TSDB_CODE_SUCCESS;
}
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (NULL == pRoot) { if (NULL == pRoot) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -8758,6 +8780,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT: case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT: case QUERY_NODE_SHOW_VARIABLES_STMT:
return extractShowVariablesResultSchema(numOfCols, pSchema); return extractShowVariablesResultSchema(numOfCols, pSchema);
case QUERY_NODE_COMPACT_DATABASE_STMT:
return extractCompactDbResultSchema(numOfCols, pSchema);
default: default:
break; break;
} }
@ -10406,6 +10430,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->execMode = QUERY_EXEC_MODE_LOCAL; pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
break; break;
case QUERY_NODE_SHOW_VARIABLES_STMT: case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_COMPACT_DATABASE_STMT:
pQuery->haveResultSet = true; pQuery->haveResultSet = true;
pQuery->execMode = QUERY_EXEC_MODE_RPC; pQuery->execMode = QUERY_EXEC_MODE_RPC;
if (NULL != pCxt->pCmdMsg) { if (NULL != pCxt->pCmdMsg) {