Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-09-05 17:52:55 +08:00
commit 7cd34dddf5
34 changed files with 1196 additions and 941 deletions

View File

@ -158,6 +158,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo
// query client // query client
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern bool tsQueryTbNotExistAsEmpty;
extern int32_t tsQueryRspPolicy; extern int32_t tsQueryRspPolicy;
extern int64_t tsQueryMaxConcurrentTables; extern int64_t tsQueryMaxConcurrentTables;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;

View File

@ -15,7 +15,7 @@
#ifndef TAOS_COUNTER_H #ifndef TAOS_COUNTER_H
#define TAOS_COUNTER_H #define TAOS_COUNTER_H
#include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include "taos_metric.h" #include "taos_metric.h"
@ -99,4 +99,7 @@ int taos_counter_inc(taos_counter_t *self, const char **label_values);
*/ */
int taos_counter_add(taos_counter_t *self, double r_value, const char **label_values); int taos_counter_add(taos_counter_t *self, double r_value, const char **label_values);
int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size);
int taos_counter_get_keys_size(taos_counter_t *self);
int taos_counter_delete(taos_counter_t *self, char *key);
#endif // TAOS_COUNTER_H #endif // TAOS_COUNTER_H

View File

@ -65,6 +65,8 @@ typedef struct SParseCsvCxt {
const char* pLastSqlPos; // the location of the last parsed sql const char* pLastSqlPos; // the location of the last parsed sql
} SParseCsvCxt; } SParseCsvCxt;
typedef void(*setQueryFn)(int64_t);
typedef struct SParseContext { typedef struct SParseContext {
uint64_t requestId; uint64_t requestId;
int64_t requestRid; int64_t requestRid;
@ -98,6 +100,7 @@ typedef struct SParseContext {
void* parseSqlParam; void* parseSqlParam;
int8_t biMode; int8_t biMode;
SArray* pSubMetaList; SArray* pSubMetaList;
setQueryFn setQueryFp;
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);

View File

@ -499,7 +499,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS #ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else #else
#define TSDB_MAX_RPC_THREADS 10 #define TSDB_MAX_RPC_THREADS 50
#endif #endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type

View File

@ -52,11 +52,11 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) #define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) #define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)res == RES_TYPE__TMQ_BATCH_META) #define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META)
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
@ -284,6 +284,7 @@ typedef struct SRequestObj {
bool isSubReq; bool isSubReq;
bool inCallback; bool inCallback;
bool isStmtBind; // is statement bind parameter bool isStmtBind; // is statement bind parameter
bool isQuery;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId; int64_t allocatorRefId;
@ -420,6 +421,7 @@ typedef struct SSqlCallbackWrapper {
void* pPlanInfo; void* pPlanInfo;
} SSqlCallbackWrapper; } SSqlCallbackWrapper;
void setQueryRequest(int64_t rId);
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper); void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper);

View File

@ -31,6 +31,15 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo); static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
void setQueryRequest(int64_t rId) {
SRequestObj* pReq = acquireRequest(rId);
if (pReq != NULL) {
pReq->isQuery = true;
(void)releaseRequest(rId);
}
}
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
return false; return false;
@ -286,7 +295,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.isStmtBind = pRequest->isStmtBind}; .isStmtBind = pRequest->isStmtBind,
.setQueryFp = setQueryRequest};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
@ -339,8 +349,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; // int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem); (void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -396,8 +406,8 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SAppInstInfo* pAppInfo = getAppInfo(pRequest); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; // int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
if (code) { if (code) {
doRequestCallback(pRequest, code); doRequestCallback(pRequest, code);
} }
@ -2953,6 +2963,10 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
void doRequestCallback(SRequestObj* pRequest, int32_t code) { void doRequestCallback(SRequestObj* pRequest, int32_t code) {
pRequest->inCallback = true; pRequest->inCallback = true;
int64_t this = pRequest->self; int64_t this = pRequest->self;
if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
code = TSDB_CODE_SUCCESS;
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code); pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
SRequestObj* pReq = acquireRequest(this); SRequestObj* pReq = acquireRequest(this);
if (pReq != NULL) { if (pReq != NULL) {

View File

@ -296,9 +296,8 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
pSendInfo->fp = fetchWhiteListCallbackFn; pSendInfo->fp = fetchWhiteListCallbackFn;
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST; pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
int64_t transportId = 0; SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp); if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) {
tscWarn("failed to async send msg to server"); tscWarn("failed to async send msg to server");
} }
releaseTscObj(connId); releaseTscObj(connId);
@ -860,9 +859,9 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
return pResInfo->pCol[columnIndex].offset; return pResInfo->pCol[columnIndex].offset;
} }
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -875,22 +874,22 @@ int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *r
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
SResultColumn *pCol = &pResInfo->pCol[columnIndex]; SResultColumn *pCol = &pResInfo->pCol[columnIndex];
if (*rows > pResInfo->numOfRows){ if (*rows > pResInfo->numOfRows) {
*rows = pResInfo->numOfRows; *rows = pResInfo->numOfRows;
} }
if (IS_VAR_DATA_TYPE(pField->type)) { if (IS_VAR_DATA_TYPE(pField->type)) {
for(int i = 0; i < *rows; i++){ for (int i = 0; i < *rows; i++) {
if(pCol->offset[i] == -1){ if (pCol->offset[i] == -1) {
result[i] = true; result[i] = true;
}else{ } else {
result[i] = false; result[i] = false;
} }
} }
}else{ } else {
for(int i = 0; i < *rows; i++){ for (int i = 0; i < *rows; i++) {
if (colDataIsNull_f(pCol->nullbitmap, i)){ if (colDataIsNull_f(pCol->nullbitmap, i)) {
result[i] = true; result[i] = true;
}else{ } else {
result[i] = false; result[i] = false;
} }
} }
@ -1235,7 +1234,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SS
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.allocatorId = pRequest->allocatorRefId, .allocatorId = pRequest->allocatorRefId,
.parseSqlFp = clientParseSql, .parseSqlFp = clientParseSql,
.parseSqlParam = pWrapper}; .parseSqlParam = pWrapper,
.setQueryFp = setQueryRequest};
int8_t biMode = atomic_load_8(&((STscObj *)pTscObj)->biMode); int8_t biMode = atomic_load_8(&((STscObj *)pTscObj)->biMode);
(*pCxt)->biMode = biMode; (*pCxt)->biMode = biMode;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -113,15 +113,15 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId);
} }
MonitorSlowLogData tmp = {.clusterId = p->clusterId, MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type, .type = p->type,
.fileName = p->fileName, .fileName = p->fileName,
.pFile = p->pFile, .pFile = p->pFile,
.offset = p->offset, .offset = p->offset,
.data = NULL}; .data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) { if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL; p->fileName = NULL;
} else { } else {
if(taosCloseFile(&(p->pFile)) != 0) { if (taosCloseFile(&(p->pFile)) != 0) {
tscError("failed to close file:%p", p->pFile); tscError("failed to close file:%p", p->pFile);
} }
} }
@ -165,8 +165,8 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO
pInfo->requestId = tGenIdPI64(); pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
int64_t transporterId = 0; // int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
FAILED: FAILED:
if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) { if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
@ -286,7 +286,7 @@ void monitorCreateClient(int64_t clusterId) {
return; return;
fail: fail:
destroyMonitorClient(&pMonitor); destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -302,7 +302,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return; if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor; MonitorClient* pMonitor = *ppMonitor;
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){ if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector"); tscError("failed to add metric to collector");
(void)taos_counter_destroy(newCounter); (void)taos_counter_destroy(newCounter);
goto end; goto end;
@ -315,7 +315,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name,
newCounter); newCounter);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -338,13 +338,13 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName);
goto end; goto end;
} }
if (taos_counter_inc(*ppCounter, label_values) != 0){ if (taos_counter_inc(*ppCounter, label_values) != 0) {
tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName); tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName);
goto end; goto end;
} }
tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end: end:
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
} }
@ -413,7 +413,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
return NULL; return NULL;
} }
if((size <= *offset)){ if ((size <= *offset)) {
tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset); tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
terrno = TSDB_CODE_TSC_INTERNAL_ERROR; terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL; return NULL;
@ -510,13 +510,13 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size); char* data = readFile(pFile, offset, size);
if(data == NULL) return terrno; if (data == NULL) return terrno;
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
pInst->pTransporter, &ep); pInst->pTransporter, &ep);
} }
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL){ if (fileName == NULL) {
return; return;
} }
int64_t size = getFileSize(*fileName); int64_t size = getFileSize(*fileName);
@ -525,10 +525,11 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td
tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName);
} else { } else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
if (code == 0){ if (code == 0) {
tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId); tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId);
}else{ } else {
tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code); tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId,
code);
} }
*fileName = NULL; *fileName = NULL;
} }

View File

@ -1241,7 +1241,9 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pStmt->taos->pAppInfo->pTransporter, .pTransporter = pStmt->taos->pAppInfo->pTransporter,
.pStmtCb = NULL, .pStmtCb = NULL,
.pUser = pStmt->taos->user}; .pUser = pStmt->taos->user,
.setQueryFp = setQueryRequest};
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog)); STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));

View File

@ -552,9 +552,9 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0; // int64_t transporterId = 0;
(void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
if (code != 0) { if (code != 0) {
(void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code; return code;
@ -955,8 +955,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed"); tscError("tmqSendHbReq asyncSendMsgToServer failed");
} }
@ -1436,8 +1435,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
goto FAIL; goto FAIL;
} }
@ -2044,10 +2042,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0; // int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) { if (code != 0) {
@ -3221,8 +3219,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->fp = tmCommittedCb; sendInfo->fp = tmCommittedCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); (void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
@ -3498,13 +3495,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->fp = tmqGetWalInfoCb; sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0; // int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
if (code != 0) { if (code != 0) {
goto end; goto end;
} }
@ -3668,8 +3665,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->fp = tmqSeekCb; sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK; sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) { if (code != 0) {
(void)tsem2_destroy(&pParam->sem); (void)tsem2_destroy(&pParam->sem);
taosMemoryFree(pParam); taosMemoryFree(pParam);

View File

@ -162,6 +162,7 @@ int32_t tmqMaxTopicNum = 20;
int32_t tmqRowSize = 4096; int32_t tmqRowSize = 4096;
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
bool tsQueryTbNotExistAsEmpty = false;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = true; bool tsEnableQueryHb = true;
@ -569,6 +570,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN( TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT)); cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, CFG_SCOPE_CLIENT, CFG_DYN_ENT_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, CFG_SCOPE_CLIENT, CFG_DYN_ENT_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryTableNotExistAsEmpty", tsQueryTbNotExistAsEmpty, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
@ -1181,6 +1183,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryPolicy"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryPolicy");
tsQueryPolicy = pItem->i32; tsQueryPolicy = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryTableNotExistAsEmpty");
tsQueryTbNotExistAsEmpty = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableQueryHb"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableQueryHb");
tsEnableQueryHb = pItem->bval; tsEnableQueryHb = pItem->bval;
@ -2218,6 +2223,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"numOfLogLines", &tsNumOfLogLines}, {"numOfLogLines", &tsNumOfLogLines},
{"querySmaOptimize", &tsQuerySmaOptimize}, {"querySmaOptimize", &tsQuerySmaOptimize},
{"queryPolicy", &tsQueryPolicy}, {"queryPolicy", &tsQueryPolicy},
{"queryTableNotExistAsEmpty", &tsQueryTbNotExistAsEmpty},
{"queryPlannerTrace", &tsQueryPlannerTrace}, {"queryPlannerTrace", &tsQueryPlannerTrace},
{"queryNodeChunkSize", &tsQueryNodeChunkSize}, {"queryNodeChunkSize", &tsQueryNodeChunkSize},
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, {"queryUseNodeAllocator", &tsQueryUseNodeAllocator},

File diff suppressed because it is too large Load Diff

View File

@ -23,26 +23,27 @@ extern "C" {
#endif #endif
typedef struct SDnodeMgmt { typedef struct SDnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
TdThread statusThread; TdThread statusThread;
TdThread notifyThread; TdThread notifyThread;
TdThread monitorThread; TdThread monitorThread;
TdThread auditThread; TdThread auditThread;
TdThread crashReportThread; TdThread crashReportThread;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordsFp; MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp;
GetVnodeLoadsFp getVnodeLoadsFp; SendAuditRecordsFp sendAuditRecordsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
int32_t statusSeq; GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
} SDnodeMgmt; } SDnodeMgmt;
// dmHandle.c // dmHandle.c

View File

@ -65,6 +65,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->monitorCleanExpiredSamplesFp = pInput->monitorCleanExpiredSamplesFp;
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;

View File

@ -168,6 +168,7 @@ static void *dmMonitorThreadFp(void *param) {
float interval = (curTime - lastTime) / 1000.0f; float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsMonitorInterval) { if (interval >= tsMonitorInterval) {
(*pMgmt->sendMonitorReportFp)(); (*pMgmt->sendMonitorReportFp)();
(*pMgmt->monitorCleanExpiredSamplesFp)();
lastTime = curTime; lastTime = curTime;
trimCount = (trimCount + 1) % TRIM_FREQ; trimCount = (trimCount + 1) % TRIM_FREQ;

View File

@ -14,8 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "taos_monitor.h"
#include "vmInt.h" #include "vmInt.h"
extern taos_counter_t *tsInsertCounter;
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return; if (pInfo->pVloads == NULL) return;
@ -117,6 +120,34 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads); taosArrayDestroy(pVloads);
} }
void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
int list_size = taos_counter_get_keys_size(tsInsertCounter);
if (list_size == 0) return;
int32_t *vgroup_ids;
char **keys;
int r = 0;
r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size);
if (r) {
dError("failed to get vgroup ids");
return;
}
(void)taosThreadRwlockRdlock(&pMgmt->lock);
for (int i = 0; i < list_size; i++) {
int32_t vgroup_id = vgroup_ids[i];
void *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t));
if (vnode == NULL) {
r = taos_counter_delete(tsInsertCounter, keys[i]);
if (r) {
dError("failed to delete monitor sample key:%s", keys[i]);
}
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
if (vgroup_ids) taosMemoryFree(vgroup_ids);
if (keys) taosMemoryFree(keys);
return;
}
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));

View File

@ -128,6 +128,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c // dmMonitor.c
void dmSendMonitorReport(); void dmSendMonitorReport();
void dmMonitorCleanExpiredSamples();
void dmSendAuditRecords(); void dmSendAuditRecords();
void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);

View File

@ -39,6 +39,8 @@ void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
void vmCleanExpriedSamples(void *pMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -409,6 +409,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
.processDropNodeFp = dmProcessDropNodeReq, .processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport, .sendMonitorReportFp = dmSendMonitorReport,
.monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples,
.sendAuditRecordFp = auditSendRecordsInBatch, .sendAuditRecordFp = auditSendRecordsInBatch,
.getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,

View File

@ -33,8 +33,8 @@ static void dmGetMonitorBasicInfoBasic(SDnode *pDnode, SMonBasicInfo *pInfo) {
} }
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
//pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); // pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f);
pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) /1000.0f; pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / 1000.0f;
pInfo->has_mnode = pDnode->wrappers[MNODE].required; pInfo->has_mnode = pDnode->wrappers[MNODE].required;
pInfo->has_qnode = pDnode->wrappers[QNODE].required; pInfo->has_qnode = pDnode->wrappers[QNODE].required;
pInfo->has_snode = pDnode->wrappers[SNODE].required; pInfo->has_snode = pDnode->wrappers[SNODE].required;
@ -52,6 +52,17 @@ static void dmGetDmMonitorInfo(SDnode *pDnode) {
monSetDmInfo(&dmInfo); monSetDmInfo(&dmInfo);
} }
void dmCleanExpriedSamples(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (pWrapper->pMgmt != NULL) {
vmCleanExpriedSamples(pWrapper->pMgmt);
}
}
dmReleaseWrapper(pWrapper);
return;
}
static void dmGetDmMonitorInfoBasic(SDnode *pDnode) { static void dmGetDmMonitorInfoBasic(SDnode *pDnode) {
SMonDmInfo dmInfo = {0}; SMonDmInfo dmInfo = {0};
dmGetMonitorBasicInfoBasic(pDnode, &dmInfo.basic); dmGetMonitorBasicInfoBasic(pDnode, &dmInfo.basic);
@ -123,11 +134,17 @@ void dmSendMonitorReport() {
monGenAndSendReport(); monGenAndSendReport();
} }
//Todo: put this in seperate file in the future void dmMonitorCleanExpiredSamples() {
void dmSendAuditRecords() { if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
auditSendRecordsInBatch(); dTrace("clean monitor expired samples");
SDnode *pDnode = dmInstance();
(void)dmCleanExpriedSamples(pDnode);
} }
// Todo: put this in seperate file in the future
void dmSendAuditRecords() { auditSendRecordsInBatch(); }
void dmGetVnodeLoads(SMonVloadInfo *pInfo) { void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];

View File

@ -116,6 +116,7 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*SendMonitorReportFp)(); typedef void (*SendMonitorReportFp)();
typedef void (*MonitorCleanExpiredSamplesFp)();
typedef void (*SendAuditRecordsFp)(); typedef void (*SendAuditRecordsFp)();
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
@ -146,21 +147,22 @@ typedef struct {
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {
const char *path; const char *path;
const char *name; const char *name;
STfs *pTfs; STfs *pTfs;
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordFp; MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp;
GetVnodeLoadsFp getVnodeLoadsFp; SendAuditRecordsFp sendAuditRecordFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
StopDnodeFp stopDnodeFp; GetQnodeLoadsFp getQnodeLoadsFp;
StopDnodeFp stopDnodeFp;
} SMgmtInputOpt; } SMgmtInputOpt;
typedef struct { typedef struct {

View File

@ -42,7 +42,7 @@ int32_t mndPerfsInitMeta(SHashObj *hash) {
int32_t code = 0; int32_t code = 0;
STableMetaRsp meta = {0}; STableMetaRsp meta = {0};
tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName)); tstrncpy(meta.dbFName, TSDB_PERFORMANCE_SCHEMA_DB, sizeof(meta.dbFName));
meta.tableType = TSDB_SYSTEM_TABLE; meta.tableType = TSDB_SYSTEM_TABLE;
meta.sversion = 1; meta.sversion = 1;
meta.tversion = 1; meta.tversion = 1;

View File

@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback; pMsgSendInfo->fp = inserterCallback;
int64_t transporterId = 0; return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
} }
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) { static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
@ -228,14 +227,15 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end; goto _end;
} }
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
if (pColInfoData->info.type != pCol->type) { if (pColInfoData->info.type != pCol->type) {
qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type); qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end; goto _end;
} }
@ -462,7 +462,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->explain = pInserterNode->explain; inserter->explain = pInserterNode->explain;
int64_t suid = 0; int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
&inserter->pSchema, &suid);
if (code) { if (code) {
terrno = code; terrno = code;
goto _return; goto _return;
@ -484,7 +485,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
false, HASH_NO_LOCK); false, HASH_NO_LOCK);
if (NULL == inserter->pCols) { if (NULL == inserter->pCols) {
goto _return; goto _return;
} }
SNode* pNode = NULL; SNode* pNode = NULL;

View File

@ -1575,8 +1575,8 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader mr = {0}; SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p, code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId,
numOfRows, GET_TASKID(pTaskInfo)); p, numOfRows, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -2170,8 +2170,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId; pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0; code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo);
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code; pTaskInfo->code = code;
@ -2880,7 +2879,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
code = tableListGetSize(pTableListInfo, &num); code = tableListGetSize(pTableListInfo, &num);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL); (void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);

View File

@ -16,6 +16,7 @@
#ifndef TAOS_METRIC_FORMATTER_I_H #ifndef TAOS_METRIC_FORMATTER_I_H
#define TAOS_METRIC_FORMATTER_I_H #define TAOS_METRIC_FORMATTER_I_H
#include <stdint.h>
// Private // Private
#include "taos_metric_formatter_t.h" #include "taos_metric_formatter_t.h"
@ -57,8 +58,8 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *metric_formatter
/** /**
* @brief API PRIVATE Loads the formatter with a metric sample * @brief API PRIVATE Loads the formatter with a metric sample
*/ */
int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, char *ts,
char *ts, char *format); char *format);
/** /**
* @brief API PRIVATE Loads a metric in the string exposition format * @brief API PRIVATE Loads a metric in the string exposition format
@ -80,4 +81,5 @@ int taos_metric_formatter_clear(taos_metric_formatter_t *self);
*/ */
char *taos_metric_formatter_dump(taos_metric_formatter_t *metric_formatter); char *taos_metric_formatter_dump(taos_metric_formatter_t *metric_formatter);
int32_t taos_metric_formatter_get_vgroup_id(char *key);
#endif // TAOS_METRIC_FORMATTER_I_H #endif // TAOS_METRIC_FORMATTER_I_H

View File

@ -20,13 +20,14 @@
#include "taos_alloc.h" #include "taos_alloc.h"
// Private // Private
#include "taos_test.h"
#include "taos_errors.h" #include "taos_errors.h"
#include "taos_log.h" #include "taos_log.h"
#include "taos_metric_formatter_i.h"
#include "taos_metric_i.h" #include "taos_metric_i.h"
#include "taos_metric_sample_i.h" #include "taos_metric_sample_i.h"
#include "taos_metric_sample_t.h" #include "taos_metric_sample_t.h"
#include "taos_metric_t.h" #include "taos_metric_t.h"
#include "taos_test.h"
taos_counter_t *taos_counter_new(const char *name, const char *help, size_t label_key_count, const char **label_keys) { taos_counter_t *taos_counter_new(const char *name, const char *help, size_t label_key_count, const char **label_keys) {
return (taos_counter_t *)taos_metric_new(TAOS_COUNTER, name, help, label_key_count, label_keys); return (taos_counter_t *)taos_metric_new(TAOS_COUNTER, name, help, label_key_count, label_keys);
@ -64,3 +65,49 @@ int taos_counter_add(taos_counter_t *self, double r_value, const char **label_va
if (sample == NULL) return 1; if (sample == NULL) return 1;
return taos_metric_sample_add(sample, r_value); return taos_metric_sample_add(sample, r_value);
} }
int taos_counter_get_keys_size(taos_counter_t *self) { return self->samples->keys->size; }
int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size) {
TAOS_TEST_PARA(self != NULL);
if (self == NULL) return 1;
if (self->type != TAOS_COUNTER) {
TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE);
return 1;
}
if (self->samples == NULL) return 1;
(void)pthread_rwlock_rdlock(self->rwlock);
taos_linked_list_t *key_list = self->samples->keys;
*list_size = key_list->size;
int r = 0;
*vgroup_ids = (int32_t *)taos_malloc(*list_size * sizeof(int32_t));
if (vgroup_ids == NULL) {
(void)pthread_rwlock_unlock(self->rwlock);
return 1;
}
*keys = (char **)taos_malloc(*list_size * sizeof(char *));
if (keys == NULL) {
(void)pthread_rwlock_unlock(self->rwlock);
return 1;
}
int index = 0;
for (taos_linked_list_node_t *current_key = key_list->head; current_key != NULL; current_key = current_key->next) {
char *key = (char *)current_key->item;
int32_t vgroup_id = taos_metric_formatter_get_vgroup_id(key);
(*vgroup_ids)[index] = vgroup_id;
(*keys)[index] = key;
index++;
}
(void)pthread_rwlock_unlock(self->rwlock);
return r;
}
int taos_counter_delete(taos_counter_t *self, char *key) {
TAOS_TEST_PARA(self != NULL);
if (self == NULL) return 1;
if (self->type != TAOS_COUNTER) {
TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE);
return 1;
}
return taos_map_delete(self->samples, key);
}

View File

@ -156,6 +156,21 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char
} }
return 0; return 0;
} }
int32_t taos_metric_formatter_get_vgroup_id(char *key) {
char *start, *end;
char vgroupid[10];
start = strstr(key, "vgroup_id=\"");
if (start) {
start += strlen("vgroup_id=\"");
end = strchr(start, '\"');
if (end) {
strncpy(vgroupid, start, end - start);
vgroupid[end - start] = '\0';
}
return strtol(vgroupid, NULL, 10);
}
return 0;
}
/* /*
int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample, int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample,
char *ts, char *format) { char *ts, char *format) {

View File

@ -4681,7 +4681,8 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
&(pRealTable->pMeta), true); &(pRealTable->pMeta), true);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code)); (void)generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
return code;
} }
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType && (!pCurrSmt->tagScan || pCxt->pParseCxt->biMode)) { if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType && (!pCurrSmt->tagScan || pCxt->pParseCxt->biMode)) {
@ -6795,6 +6796,10 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
} }
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (pCxt->pParseCxt && pCxt->pParseCxt->setQueryFp) {
(*pCxt->pParseCxt->setQueryFp)(pCxt->pParseCxt->requestRid);
}
if (NULL == pSelect->pFromTable) { if (NULL == pSelect->pFromTable) {
return translateSelectWithoutFrom(pCxt, pSelect); return translateSelectWithoutFrom(pCxt, pSelect);
} else { } else {
@ -6919,6 +6924,10 @@ static int32_t checkSetOperLimit(STranslateContext* pCxt, SLimitNode* pLimit) {
} }
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) { static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
if (pCxt->pParseCxt && pCxt->pParseCxt->setQueryFp) {
(*pCxt->pParseCxt->setQueryFp)(pCxt->pParseCxt->requestRid);
}
int32_t code = translateQuery(pCxt, pSetOperator->pLeft); int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = resetHighLevelTranslateNamespace(pCxt); code = resetHighLevelTranslateNamespace(pCxt);

View File

@ -366,9 +366,9 @@ TEST_F(ParserSelectTest, semanticCheck) {
run("SELECT t1.c1, t1.cc1 FROM t1", TSDB_CODE_PAR_INVALID_COLUMN); run("SELECT t1.c1, t1.cc1 FROM t1", TSDB_CODE_PAR_INVALID_COLUMN);
// TSDB_CODE_PAR_GET_META_ERROR // TSDB_CODE_PAR_GET_META_ERROR
run("SELECT * FROM t10", TSDB_CODE_PAR_GET_META_ERROR); run("SELECT * FROM t10", TSDB_CODE_PAR_TABLE_NOT_EXIST);
run("SELECT * FROM test.t10", TSDB_CODE_PAR_GET_META_ERROR); run("SELECT * FROM test.t10", TSDB_CODE_PAR_TABLE_NOT_EXIST);
// TSDB_CODE_PAR_TABLE_NOT_EXIST // TSDB_CODE_PAR_TABLE_NOT_EXIST
run("SELECT t2.c1 FROM t1", TSDB_CODE_PAR_TABLE_NOT_EXIST); run("SELECT t2.c1 FROM t1", TSDB_CODE_PAR_TABLE_NOT_EXIST);

View File

@ -574,7 +574,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
goto PROCESS_META_OVER; goto PROCESS_META_OVER;
} }
if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && if (!IS_SYS_DBNAME(metaRsp.dbFName) &&
!tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) { !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
code = TSDB_CODE_TSC_INVALID_VALUE; code = TSDB_CODE_TSC_INVALID_VALUE;
goto PROCESS_META_OVER; goto PROCESS_META_OVER;

View File

@ -5238,22 +5238,20 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED; *pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = TSDB_CODE_SUCCESS;
SScalarParam output = {0}; SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); FLT_ERR_JRET(sclCreateColumnInfoData(&type, pSrc->info.rows, &output));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (info->scalarMode) { if (info->scalarMode) {
SArray *pList = taosArrayInit(1, POINTER_BYTES); SArray *pList = taosArrayInit(1, POINTER_BYTES);
if (NULL == pList) { if (NULL == pList) {
FLT_ERR_RET(terrno); FLT_ERR_JRET(terrno);
} }
if (NULL == taosArrayPush(pList, &pSrc)) { if (NULL == taosArrayPush(pList, &pSrc)) {
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); taosArrayDestroy(pList);
FLT_ERR_JRET(terrno);
} }
code = scalarCalculate(info->sclCtx.node, pList, &output); code = scalarCalculate(info->sclCtx.node, pList, &output);
@ -5261,7 +5259,7 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
*p = output.columnData; *p = output.columnData;
FLT_ERR_RET(code); FLT_ERR_JRET(code);
if (output.numOfQualified == output.numOfRows) { if (output.numOfQualified == output.numOfRows) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED; *pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
@ -5277,11 +5275,12 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
output.numOfRows = pSrc->info.rows; output.numOfRows = pSrc->info.rows;
if (*p == NULL) { if (*p == NULL) {
return TSDB_CODE_APP_ERROR; fltError("filterExecute failed, column data is NULL");
FLT_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
bool keepAll = false; bool keepAll = false;
FLT_ERR_RET((info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified, &keepAll)); FLT_ERR_JRET((info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified, &keepAll));
// todo this should be return during filter procedure // todo this should be return during filter procedure
if (keepAll) { if (keepAll) {
@ -5304,6 +5303,10 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return:
sclFreeParam(&output);
*p = NULL;
return code;
} }
typedef struct SClassifyConditionCxt { typedef struct SClassifyConditionCxt {

View File

@ -272,19 +272,19 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \ #define ASYNC_CHECK_HANDLE(exh1, id) \
do { \ do { \
if (id > 0) { \ if (id > 0) { \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \ if (exh2 == NULL || id != exh2->refId) { \
tDebug("ref:%" PRId64 " already released" PRIu64, id); \ tDebug("ref:%" PRId64 " already released", id); \
code = terrno; \ code = terrno; \
goto _return1; \ goto _return1; \
} \ } \
} else { \ } else { \
tWarn("invalid handle to release"); \ tDebug("invalid handle to release"); \
goto _return2; \ goto _return2; \
} \ } \
} while (0) } while (0)
int32_t transInitBuffer(SConnBuffer* buf); int32_t transInitBuffer(SConnBuffer* buf);
@ -443,6 +443,7 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestroyExHandle(void* handle); void transDestroyExHandle(void* handle);
int32_t transGetRefMgt(); int32_t transGetRefMgt();
int32_t transGetSvrRefMgt();
int32_t transGetInstMgt(); int32_t transGetInstMgt();
int32_t transGetSyncMsgMgt(); int32_t transGetSyncMsgMgt();

View File

@ -20,6 +20,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
static int32_t svrRefMgt;
static int32_t instMgt; static int32_t instMgt;
static int32_t transSyncMsgMgt; static int32_t transSyncMsgMgt;
@ -704,12 +705,14 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) {
static void transInitEnv() { static void transInitEnv() {
refMgt = transOpenRefMgt(50000, transDestroyExHandle); refMgt = transOpenRefMgt(50000, transDestroyExHandle);
svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
instMgt = taosOpenRef(50, rpcCloseImpl); instMgt = taosOpenRef(50, rpcCloseImpl);
transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
(void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); (void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
} }
static void transDestroyEnv() { static void transDestroyEnv() {
transCloseRefMgt(refMgt); transCloseRefMgt(refMgt);
transCloseRefMgt(svrRefMgt);
transCloseRefMgt(instMgt); transCloseRefMgt(instMgt);
transCloseRefMgt(transSyncMsgMgt); transCloseRefMgt(transSyncMsgMgt);
} }
@ -724,6 +727,7 @@ int32_t transInit() {
} }
int32_t transGetRefMgt() { return refMgt; } int32_t transGetRefMgt() { return refMgt; }
int32_t transGetSvrRefMgt() { return svrRefMgt; }
int32_t transGetInstMgt() { return instMgt; } int32_t transGetInstMgt() { return instMgt; }
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }

View File

@ -373,6 +373,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd; SWorkThrd* pThrd = pConn->hostThrd;
int8_t acquire = 0;
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
@ -459,7 +460,13 @@ static bool uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
if (pHead->noResp == 1) {
transMsg.info.handle = NULL;
} else {
transMsg.info.handle = (void*)transAcquireExHandle(transGetSvrRefMgt(), pConn->refId);
acquire = 1;
}
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.cliVer = htonl(pHead->compatibilityVer);
@ -468,10 +475,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId); pConn->refId);
if (transMsg.info.handle == NULL) { // if (transMsg.info.handle == NULL) {
tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); // tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn);
return false; // return false;
} // }
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
transMsg.info.refId = -1; transMsg.info.refId = -1;
@ -483,7 +490,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = pConn->port; pConnInfo->clientPort = pConn->port;
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
(void)transReleaseExHandle(transGetRefMgt(), pConn->refId); if (acquire) transReleaseExHandle(transGetSvrRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
return true; return true;
@ -770,15 +777,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
@ -874,15 +881,15 @@ static void uvPrepareCb(uv_prepare_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
@ -1215,14 +1222,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
if (exh->refId < 0) { if (exh->refId < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
} }
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
if (pSelf != exh) { if (pSelf != exh) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
} }
@ -1284,8 +1291,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
} }
static int32_t reallocConnRef(SSvrConn* conn) { static int32_t reallocConnRef(SSvrConn* conn) {
if (conn->refId > 0) { if (conn->refId > 0) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
} }
// avoid app continue to send msg on invalid handle // avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
@ -1295,14 +1302,14 @@ static int32_t reallocConnRef(SSvrConn* conn) {
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
if (exh->refId < 0) { if (exh->refId < 0) {
taosMemoryFree(exh); taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID; return TSDB_CODE_REF_INVALID_ID;
} }
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
if (pSelf != exh) { if (pSelf != exh) {
tError("conn %p failed to acquire handle", conn); tError("conn %p failed to acquire handle", conn);
taosMemoryFree(exh); taosMemoryFree(exh);
@ -1321,8 +1328,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
} }
SWorkThrd* thrd = conn->hostThrd; SWorkThrd* thrd = conn->hostThrd;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
tDebug("%s conn %p destroy", transLabel(pTransInst), conn); tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
@ -1752,15 +1759,15 @@ int32_t transReleaseSrvHandle(void* handle) {
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to send to release handle", exh); tDebug("handle %p failed to send to release handle", exh);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to send to release handle", exh); tDebug("handle %p failed to send to release handle", exh);
@ -1803,17 +1810,17 @@ int32_t transSendResponse(const STransMsg* msg) {
tGDebug("conn %p start to send resp (1/2)", exh->handle); tGDebug("conn %p start to send resp (1/2)", exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to send resp", exh); tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to send resp", exh); tDebug("handle %p failed to send resp", exh);
@ -1848,17 +1855,17 @@ int32_t transRegisterMsg(const STransMsg* msg) {
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to register brokenlink", exh); tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to register brokenlink", exh); tDebug("handle %p failed to register brokenlink", exh);