[td-11818] Add async send callback mechanism.
This commit is contained in:
parent
067116330b
commit
0d0d4652bb
|
@ -37,7 +37,7 @@ typedef struct SQueryExecMetric {
|
|||
int64_t rsp; // receive response from server
|
||||
} SQueryExecMetric;
|
||||
|
||||
typedef struct SInstanceActivity {
|
||||
typedef struct SInstanceSummary {
|
||||
uint64_t numOfInsertsReq;
|
||||
uint64_t numOfInsertRows;
|
||||
uint64_t insertElapsedTime;
|
||||
|
@ -48,7 +48,7 @@ typedef struct SInstanceActivity {
|
|||
uint64_t numOfSlowQueries;
|
||||
uint64_t totalRequests;
|
||||
uint64_t currentRequests; // the number of SRequestObj
|
||||
} SInstanceActivity;
|
||||
} SInstanceSummary;
|
||||
|
||||
typedef struct SHeartBeatInfo {
|
||||
void *pTimer; // timer, used to send request msg to mnode
|
||||
|
@ -57,7 +57,7 @@ typedef struct SHeartBeatInfo {
|
|||
typedef struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
SInstanceActivity summary;
|
||||
SInstanceSummary summary;
|
||||
SList *pConnList; // STscObj linked list
|
||||
uint32_t clusterId;
|
||||
void *pTransporter;
|
||||
|
@ -100,16 +100,16 @@ typedef struct SReqResultInfo {
|
|||
uint32_t current;
|
||||
} SReqResultInfo;
|
||||
|
||||
typedef struct SReqMsg {
|
||||
void *pMsg;
|
||||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
} SReqMsgInfo;
|
||||
} SDataBuf;
|
||||
|
||||
typedef struct SRequestSendRecvBody {
|
||||
tsem_t rspSem; // not used now
|
||||
void* fp;
|
||||
int64_t execId; // showId/queryId
|
||||
SReqMsgInfo requestMsg;
|
||||
SDataBuf requestMsg;
|
||||
SReqResultInfo resInfo;
|
||||
} SRequestSendRecvBody;
|
||||
|
||||
|
@ -121,26 +121,38 @@ typedef struct SRequestObj {
|
|||
STscObj *pTscObj;
|
||||
SQueryExecMetric metric;
|
||||
char *sqlstr; // sql string
|
||||
SRequestSendRecvBody body;
|
||||
SRequestSendRecvBody body;
|
||||
int64_t self;
|
||||
char *msgBuf;
|
||||
int32_t code;
|
||||
void *pInfo; // sql parse info, generated by parser module
|
||||
int32_t code;
|
||||
} SRequestObj;
|
||||
|
||||
typedef struct SRequestMsgBody {
|
||||
int32_t msgType;
|
||||
SReqMsgInfo msgInfo;
|
||||
SDataBuf msgInfo;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
} SRequestMsgBody;
|
||||
|
||||
extern SAppInfo appInfo;
|
||||
extern int32_t tscReqRef;
|
||||
extern int32_t tscConnRef;
|
||||
typedef int (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
|
||||
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest);
|
||||
extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp;
|
||||
void *param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
int32_t msgType;
|
||||
SDataBuf msgInfo;
|
||||
} SMsgSendInfo;
|
||||
|
||||
extern SAppInfo appInfo;
|
||||
extern int32_t msgObjRefPool;
|
||||
extern int32_t clientConnRefPool;
|
||||
|
||||
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest);
|
||||
int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
|
||||
int taos_init();
|
||||
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <catalog.h>
|
||||
#include "os.h"
|
||||
#include "catalog.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tcache.h"
|
||||
|
@ -32,26 +32,26 @@
|
|||
#define TSC_VAR_RELEASED 0
|
||||
|
||||
SAppInfo appInfo;
|
||||
int32_t tscReqRef = -1;
|
||||
int32_t tscConnRef = -1;
|
||||
int32_t msgObjRefPool = -1;
|
||||
int32_t clientConnRefPool = -1;
|
||||
|
||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||
volatile int32_t tscInitRes = 0;
|
||||
|
||||
static void registerRequest(SRequestObj* pRequest) {
|
||||
STscObj *pTscObj = (STscObj *)taosAcquireRef(tscConnRef, pRequest->pTscObj->id);
|
||||
STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id);
|
||||
assert(pTscObj != NULL);
|
||||
|
||||
// connection has been released already, abort creating request.
|
||||
pRequest->self = taosAddRef(tscReqRef, pRequest);
|
||||
pRequest->self = taosAddRef(msgObjRefPool, pRequest);
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
|
||||
|
||||
if (pTscObj->pAppInfo) {
|
||||
SInstanceActivity *pActivity = &pTscObj->pAppInfo->summary;
|
||||
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
|
||||
|
||||
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1);
|
||||
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1);
|
||||
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
|
||||
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
|
||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
|
||||
pRequest->pTscObj->id, num, currentInst, total);
|
||||
}
|
||||
|
@ -61,13 +61,13 @@ static void deregisterRequest(SRequestObj* pRequest) {
|
|||
assert(pRequest != NULL);
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
|
||||
SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
|
||||
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
|
||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||
|
||||
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
|
||||
taosReleaseRef(tscConnRef, pTscObj->id);
|
||||
taosReleaseRef(clientConnRefPool, pTscObj->id);
|
||||
}
|
||||
|
||||
static void tscInitLogFile() {
|
||||
|
@ -150,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
|
|||
}
|
||||
|
||||
pthread_mutex_init(&pObj->mutex, NULL);
|
||||
pObj->id = taosAddRef(tscConnRef, pObj);
|
||||
pObj->id = taosAddRef(clientConnRefPool, pObj);
|
||||
|
||||
tscDebug("connObj created, 0x%"PRIx64, pObj->id);
|
||||
return pObj;
|
||||
|
@ -173,7 +173,6 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
|
|||
pRequest->type = type;
|
||||
pRequest->pTscObj = pObj;
|
||||
pRequest->body.fp = fp;
|
||||
// pRequest->body.requestMsg. = param;
|
||||
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||
tsem_init(&pRequest->body.rspSem, 0, 0);
|
||||
|
||||
|
@ -202,7 +201,7 @@ void destroyRequest(SRequestObj* pRequest) {
|
|||
return;
|
||||
}
|
||||
|
||||
taosReleaseRef(tscReqRef, pRequest->self);
|
||||
taosReleaseRef(msgObjRefPool, pRequest->self);
|
||||
}
|
||||
|
||||
void taos_init_imp(void) {
|
||||
|
@ -238,8 +237,8 @@ void taos_init_imp(void) {
|
|||
|
||||
initTaskQueue();
|
||||
|
||||
tscConnRef = taosOpenRef(200, destroyTscObj);
|
||||
tscReqRef = taosOpenRef(40960, doDestroyRequest);
|
||||
clientConnRefPool = taosOpenRef(200, destroyTscObj);
|
||||
msgObjRefPool = taosOpenRef(40960, doDestroyRequest);
|
||||
|
||||
taosGetAppName(appInfo.appName, NULL);
|
||||
appInfo.pid = taosGetPId();
|
||||
|
|
|
@ -11,10 +11,10 @@
|
|||
#include "parser.h"
|
||||
|
||||
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
||||
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
|
||||
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody);
|
||||
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||
|
||||
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
|
||||
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||
|
||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||
if (str == NULL) {
|
||||
|
@ -163,10 +163,11 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
int32_t code = qParseQuerySql(&cxt, &pQuery);
|
||||
if (qIsDclQuery(pQuery)) {
|
||||
SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery;
|
||||
pRequest->type = pDcl->msgType;
|
||||
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
|
||||
|
||||
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
|
||||
pRequest->type = pDcl->msgType;
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
|
||||
|
||||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
|
||||
if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) {
|
||||
|
@ -180,7 +181,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
SCreateTableMsg* pMsg = body.msgInfo.pMsg;
|
||||
SCreateTableMsg* pMsg = body->msgInfo.pData;
|
||||
|
||||
SName t = {0};
|
||||
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
||||
|
@ -200,14 +201,14 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
||||
}
|
||||
|
||||
sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId);
|
||||
sendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body);
|
||||
} else {
|
||||
int64_t transporterId = 0;
|
||||
sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId);
|
||||
sendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
||||
}
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyRequestMsgBody(&body);
|
||||
destroySendMsgInfo(body);
|
||||
}
|
||||
|
||||
tfree(cxt.ctx.db);
|
||||
|
@ -270,14 +271,13 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SRequestMsgBody body = {0};
|
||||
buildConnectMsg(pRequest, &body);
|
||||
SMsgSendInfo* body = buildConnectMsg(pRequest);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyRequestMsgBody(&body);
|
||||
destroySendMsgInfo(body);
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
|
||||
|
@ -294,15 +294,25 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
return pTscObj;
|
||||
}
|
||||
|
||||
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
|
||||
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
|
||||
pMsgBody->msgInfo.len = sizeof(SConnectMsg);
|
||||
pMsgBody->requestObjRefId = pRequest->self;
|
||||
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
|
||||
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (pMsgSendInfo == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pMsgSendInfo->msgType = TSDB_MSG_TYPE_CONNECT;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg);
|
||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||
pMsgSendInfo->requestId = pRequest->requestId;
|
||||
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
|
||||
pMsgSendInfo->param = pRequest;
|
||||
|
||||
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
|
||||
if (pConnect == NULL) {
|
||||
tfree(pMsgSendInfo);
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STscObj *pObj = pRequest->pTscObj;
|
||||
|
@ -315,84 +325,78 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
|
|||
pConnect->startTime = htobe64(appInfo.startTime);
|
||||
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
|
||||
|
||||
pMsgBody->msgInfo.pMsg = pConnect;
|
||||
return 0;
|
||||
pMsgSendInfo->msgInfo.pData = pConnect;
|
||||
return pMsgSendInfo;
|
||||
}
|
||||
|
||||
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
tfree(pMsgBody->msgInfo.pMsg);
|
||||
tfree(pMsgBody->msgInfo.pData);
|
||||
tfree(pMsgBody);
|
||||
}
|
||||
|
||||
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
|
||||
char *pMsg = rpcMallocCont(pBody->msgInfo.len);
|
||||
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
|
||||
char *pMsg = rpcMallocCont(pInfo->msgInfo.len);
|
||||
if (NULL == pMsg) {
|
||||
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
|
||||
tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, taosMsg[pInfo->msgType]);
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len);
|
||||
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = pBody->msgType,
|
||||
.msgType = pInfo->msgType,
|
||||
.pCont = pMsg,
|
||||
.contLen = pBody->msgInfo.len,
|
||||
.ahandle = (void*) pBody->requestObjRefId,
|
||||
.contLen = pInfo->msgInfo.len,
|
||||
.ahandle = (void*) pInfo,
|
||||
.handle = NULL,
|
||||
.code = 0
|
||||
};
|
||||
|
||||
assert(pInfo->fp != NULL);
|
||||
|
||||
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
int64_t requestRefId = (int64_t)pMsg->ahandle;
|
||||
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
|
||||
assert(pMsg->ahandle != NULL);
|
||||
|
||||
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId);
|
||||
if (pRequest == NULL) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
if (pSendInfo->requestObjRefId != 0) {
|
||||
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(msgObjRefPool, pSendInfo->requestObjRefId);
|
||||
assert(pRequest->self == pSendInfo->requestObjRefId);
|
||||
|
||||
assert(pRequest->self == requestRefId);
|
||||
pRequest->metric.rsp = taosGetTimestampMs();
|
||||
pRequest->metric.rsp = taosGetTimestampMs();
|
||||
pRequest->code = pMsg->code;
|
||||
|
||||
pRequest->code = pMsg->code;
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
if (pEpSet) {
|
||||
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
|
||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* There is not response callback function for submit response.
|
||||
* The actual inserted number of points is the first number.
|
||||
*/
|
||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
|
||||
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
|
||||
if (handleRequestRspFp[pRequest->type]) {
|
||||
char *p = malloc(pMsg->contLen);
|
||||
if (p == NULL) {
|
||||
pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
terrno = pRequest->code;
|
||||
} else {
|
||||
memcpy(p, pMsg->pCont, pMsg->contLen);
|
||||
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen);
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
if (pEpSet) {
|
||||
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
|
||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType],
|
||||
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
|
||||
|
||||
/*
|
||||
* There is not response callback function for submit response.
|
||||
* The actual inserted number of points is the first number.
|
||||
*/
|
||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId,
|
||||
taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen,
|
||||
pRequest->metric.rsp - pRequest->metric.start);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId,
|
||||
taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen,
|
||||
pRequest->metric.rsp - pRequest->metric.start);
|
||||
}
|
||||
|
||||
taosReleaseRef(msgObjRefPool, pSendInfo->requestObjRefId);
|
||||
}
|
||||
|
||||
taosReleaseRef(tscReqRef, requestRefId);
|
||||
SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen};
|
||||
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
|
||||
|
@ -429,14 +433,14 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||
pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
||||
|
||||
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
|
||||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyRequestMsgBody(&body);
|
||||
destroySendMsgInfo(body);
|
||||
|
||||
pResultInfo->current = 0;
|
||||
if (pResultInfo->numOfRows <= pResultInfo->current) {
|
||||
|
|
|
@ -35,14 +35,14 @@ void taos_cleanup(void) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t id = tscReqRef;
|
||||
tscReqRef = -1;
|
||||
int32_t id = msgObjRefPool;
|
||||
msgObjRefPool = -1;
|
||||
taosCloseRef(id);
|
||||
|
||||
cleanupTaskQueue();
|
||||
|
||||
id = tscConnRef;
|
||||
tscConnRef = -1;
|
||||
id = clientConnRefPool;
|
||||
clientConnRefPool = -1;
|
||||
taosCloseRef(id);
|
||||
|
||||
rpcCleanup();
|
||||
|
@ -72,7 +72,7 @@ void taos_close(TAOS* taos) {
|
|||
STscObj *pTscObj = (STscObj *)taos;
|
||||
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
|
||||
|
||||
taosRemoveRef(tscConnRef, pTscObj->id);
|
||||
taosRemoveRef(clientConnRefPool, pTscObj->id);
|
||||
}
|
||||
|
||||
int taos_errno(TAOS_RES *tres) {
|
||||
|
@ -130,7 +130,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
return taos_query_l(taos, sql, strlen(sql));
|
||||
return taos_query_l(taos, sql, (int32_t) strlen(sql));
|
||||
}
|
||||
|
||||
TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
||||
|
@ -140,7 +140,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
|||
|
||||
SRequestObj *pRequest = (SRequestObj *) pRes;
|
||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
pRequest->type == TSDB_SQL_INSERT) {
|
||||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,45 +18,23 @@
|
|||
#include "tname.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "tmsgtype.h"
|
||||
#include "trpc.h"
|
||||
|
||||
int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
|
||||
int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(void*, const SDataBuf* pMsg, int32_t code);
|
||||
|
||||
int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
|
||||
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
|
||||
pMsgBody->msgInfo.len = sizeof(SConnectMsg);
|
||||
pMsgBody->requestObjRefId = pRequest->self;
|
||||
|
||||
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
|
||||
if (pConnect == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO refactor full_name
|
||||
char *db; // ugly code to move the space
|
||||
|
||||
STscObj *pObj = pRequest->pTscObj;
|
||||
pthread_mutex_lock(&pObj->mutex);
|
||||
db = strstr(pObj->db, TS_PATH_DELIMITER);
|
||||
|
||||
db = (db == NULL) ? pObj->db : db + 1;
|
||||
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
pConnect->pid = htonl(appInfo.pid);
|
||||
pConnect->startTime = htobe64(appInfo.startTime);
|
||||
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
|
||||
|
||||
pMsgBody->msgInfo.pMsg = pConnect;
|
||||
int genericExecCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SRequestObj* pRequest = param;
|
||||
pRequest->code = code;
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SRequestObj* pRequest = param;
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
|
||||
SConnectRsp *pConnect = (SConnectRsp *)pMsg;
|
||||
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
|
||||
pConnect->acctId = htonl(pConnect->acctId);
|
||||
pConnect->connId = htonl(pConnect->connId);
|
||||
pConnect->clusterId = htonl(pConnect->clusterId);
|
||||
|
@ -81,15 +59,19 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
|||
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
|
||||
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
|
||||
pRequest->body.resInfo.pRspMsg = pMsg;
|
||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||
tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns);
|
||||
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
|
||||
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
||||
pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg);
|
||||
pMsgBody->requestObjRefId = pRequest->self;
|
||||
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
|
||||
pMsgSendInfo->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
|
||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||
pMsgSendInfo->param = pRequest;
|
||||
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
|
||||
|
||||
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
|
||||
if (pRetrieveMsg == NULL) {
|
||||
|
@ -97,29 +79,38 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMs
|
|||
}
|
||||
|
||||
pRetrieveMsg->showId = htonl(pRequest->body.execId);
|
||||
pMsgBody->msgInfo.pMsg = pRetrieveMsg;
|
||||
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) {
|
||||
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
|
||||
if (pRequest->type == TSDB_MSG_TYPE_SHOW_RETRIEVE) {
|
||||
SRequestMsgBody body = {0};
|
||||
buildRetrieveMnodeMsg(pRequest, &body);
|
||||
return body;
|
||||
buildRetrieveMnodeMsg(pRequest, pMsgSendInfo);
|
||||
} else {
|
||||
assert(pRequest != NULL);
|
||||
SRequestMsgBody body = {
|
||||
.requestObjRefId = pRequest->self,
|
||||
.msgInfo = pRequest->body.requestMsg,
|
||||
.msgType = pRequest->type,
|
||||
.requestId = pRequest->requestId,
|
||||
};
|
||||
return body;
|
||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||
pMsgSendInfo->msgType = pRequest->type;
|
||||
pMsgSendInfo->requestId = pRequest->requestId;
|
||||
pMsgSendInfo->param = pRequest;
|
||||
|
||||
pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericExecCallback:handleRequestRspFp[pRequest->type];
|
||||
}
|
||||
|
||||
return pMsgSendInfo;
|
||||
}
|
||||
|
||||
int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
SShowRsp* pShow = (SShowRsp *)pMsg;
|
||||
int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SRequestObj* pRequest = param;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return code;
|
||||
}
|
||||
|
||||
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
|
||||
pShow->showId = htonl(pShow->showId);
|
||||
|
||||
STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
|
||||
|
@ -140,7 +131,7 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
|
|||
pFields[i].bytes = pSchema[i].bytes;
|
||||
}
|
||||
|
||||
pRequest->body.resInfo.pRspMsg = pMsg;
|
||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||
|
||||
pResInfo->fields = pFields;
|
||||
|
@ -150,16 +141,18 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
|
|||
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
|
||||
|
||||
pRequest->body.execId = pShow->showId;
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
assert(msgLen >= sizeof(SRetrieveTableRsp));
|
||||
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
||||
|
||||
tfree(pRequest->body.resInfo.pRspMsg);
|
||||
pRequest->body.resInfo.pRspMsg = pMsg;
|
||||
SRequestObj* pRequest = param;
|
||||
// tfree(pRequest->body.resInfo.pRspMsg);
|
||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||
|
||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg;
|
||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
|
||||
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
||||
pRetrieve->precision = htons(pRetrieve->precision);
|
||||
|
||||
|
@ -172,29 +165,42 @@ int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t
|
|||
|
||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
|
||||
pRetrieve->completed, pRequest->body.execId);
|
||||
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t processCreateDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
// todo rsp with the vnode id list
|
||||
SRequestObj* pRequest = param;
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
int32_t processUseDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg;
|
||||
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
|
||||
SName name = {0};
|
||||
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);
|
||||
|
||||
char db[TSDB_DB_NAME_LEN] = {0};
|
||||
tNameGetDbName(&name, db);
|
||||
|
||||
SRequestObj* pRequest = param;
|
||||
setConnectionDB(pRequest->pTscObj, db);
|
||||
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
assert(pMsg != NULL);
|
||||
SRequestObj* pRequest = param;
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
int32_t processDropDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
// todo: Remove cache in catalog cache.
|
||||
SRequestObj* pRequest = param;
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
void initMsgHandleFp() {
|
||||
|
|
|
@ -49,54 +49,54 @@ int main(int argc, char** argv) {
|
|||
|
||||
TEST(testCase, driverInit_Test) { taos_init(); }
|
||||
|
||||
TEST(testCase, connect_Test) {
|
||||
TEST(testCase, connect_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, create_user_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, create_account_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, drop_account_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TEST(testCase, show_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
@ -113,7 +113,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
|
|||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_user_Test) {
|
||||
TEST(testCase, drop_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
|
@ -126,7 +126,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
|
|||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_db_Test) {
|
||||
TEST(testCase, show_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
|
||||
|
@ -191,10 +191,15 @@ TEST(testCase, drop_db_test) {
|
|||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
showDB(pConn);
|
||||
|
||||
pRes = taos_query(pConn, "create database abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
|
@ -248,7 +253,19 @@ TEST(testCase, show_stable_Test) {
|
|||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show stables");
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "show stables");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
||||
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
||||
SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
|
||||
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen);
|
||||
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
|
||||
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
|
||||
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
|
||||
|
|
|
@ -86,7 +86,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha
|
|||
return pMsg;
|
||||
}
|
||||
|
||||
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen) {
|
||||
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, int32_t msgLen) {
|
||||
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
|
||||
|
||||
pShowMsg->type = pShowInfo->showType;
|
||||
|
@ -105,6 +105,12 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t m
|
|||
pShowMsg->payloadLen = htons(pEpAddr->n);
|
||||
}
|
||||
|
||||
if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) {
|
||||
SName n = {0};
|
||||
tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db));
|
||||
tNameGetFullDbName(&n, pShowMsg->db);
|
||||
}
|
||||
|
||||
return pShowMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -4090,7 +4090,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou
|
|||
}
|
||||
}
|
||||
|
||||
*output = buildShowMsg(pShowInfo, pCtx->requestId, pMsgBuf->buf, pMsgBuf->len);
|
||||
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
|
||||
*outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue