Merge pull request #13289 from taosdata/feature/qnode
feat: scheduler based on load
This commit is contained in:
commit
d85457833f
|
@ -791,19 +791,24 @@ typedef struct {
|
|||
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||||
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
|
||||
|
||||
typedef struct SQueryNodeAddr {
|
||||
int32_t nodeId; // vgId or qnodeId
|
||||
SEpSet epSet;
|
||||
} SQueryNodeAddr;
|
||||
|
||||
typedef struct {
|
||||
SArray* addrsList; // SArray<SQueryNodeAddr>
|
||||
SQueryNodeAddr addr;
|
||||
uint64_t load;
|
||||
} SQueryNodeLoad;
|
||||
|
||||
typedef struct {
|
||||
SArray* qnodeList; // SArray<SQueryNodeLoad>
|
||||
} SQnodeListRsp;
|
||||
|
||||
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||||
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
|
||||
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
|
||||
|
||||
typedef struct SQueryNodeAddr {
|
||||
int32_t nodeId; // vgId or qnodeId
|
||||
SEpSet epSet;
|
||||
} SQueryNodeAddr;
|
||||
|
||||
typedef struct {
|
||||
SArray* pArray; // Array of SUseDbRsp
|
||||
} SUseDbBatchRsp;
|
||||
|
@ -926,6 +931,21 @@ typedef struct {
|
|||
int32_t syncState;
|
||||
} SMnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t numOfProcessedQuery;
|
||||
int64_t numOfProcessedCQuery;
|
||||
int64_t numOfProcessedFetch;
|
||||
int64_t numOfProcessedDrop;
|
||||
int64_t numOfProcessedHb;
|
||||
int64_t cacheDataSize;
|
||||
int64_t numOfQueryInQueue;
|
||||
int64_t numOfFetchInQueue;
|
||||
int64_t timeInQueryQueue;
|
||||
int64_t timeInFetchQueue;
|
||||
} SQnodeLoad;
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t sver; // software version
|
||||
int64_t dnodeVer; // dnode table version in sdb
|
||||
|
@ -937,6 +957,7 @@ typedef struct {
|
|||
int32_t numOfSupportVnodes;
|
||||
char dnodeEp[TSDB_EP_LEN];
|
||||
SMnodeLoad mload;
|
||||
SQnodeLoad qload;
|
||||
SClusterCfg clusterCfg;
|
||||
SArray* pVloads; // array of SVnodeLoad
|
||||
} SStatusReq;
|
||||
|
@ -1946,6 +1967,7 @@ typedef struct {
|
|||
int8_t killConnection;
|
||||
int8_t align[3];
|
||||
SEpSet epSet;
|
||||
SArray *pQnodeList;
|
||||
} SQueryHbRspBasic;
|
||||
|
||||
typedef struct {
|
||||
|
@ -2025,7 +2047,10 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) {
|
|||
|
||||
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
|
||||
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
|
||||
taosMemoryFreeClear(rsp->query);
|
||||
if (rsp->query) {
|
||||
taosArrayDestroy(rsp->query->pQnodeList);
|
||||
taosMemoryFreeClear(rsp->query);
|
||||
}
|
||||
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
|
||||
}
|
||||
|
||||
|
|
|
@ -253,6 +253,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MON_BM_INFO, "monitor-binfo", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
|
||||
|
||||
#if defined(TD_MSG_NUMBER_)
|
||||
TDMT_MAX
|
||||
|
|
|
@ -25,20 +25,6 @@ extern "C" {
|
|||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||
typedef struct SQnode SQnode;
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfProcessedQuery;
|
||||
int64_t numOfProcessedCQuery;
|
||||
int64_t numOfProcessedFetch;
|
||||
int64_t numOfProcessedDrop;
|
||||
int64_t memSizeInCache;
|
||||
int64_t dataSizeSend;
|
||||
int64_t dataSizeRecv;
|
||||
int64_t numOfQueryInQueue;
|
||||
int64_t numOfFetchInQueue;
|
||||
int64_t waitTimeInQueryQUeue;
|
||||
int64_t waitTimeInFetchQUeue;
|
||||
} SQnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
SMsgCb msgCb;
|
||||
} SQnodeOpt;
|
||||
|
|
|
@ -32,6 +32,10 @@ extern "C" {
|
|||
struct SDataSink;
|
||||
struct SSDataBlock;
|
||||
|
||||
typedef struct SDataSinkStat {
|
||||
uint64_t cachedSize;
|
||||
} SDataSinkStat;
|
||||
|
||||
typedef struct SDataSinkMgtCfg {
|
||||
uint32_t maxDataBlockNum; // todo: this should be numOfRows?
|
||||
uint32_t maxDataBlockNumPerQuery;
|
||||
|
@ -62,6 +66,8 @@ typedef struct SOutputData {
|
|||
*/
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
|
||||
|
||||
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat);
|
||||
|
||||
/**
|
||||
* Put the result set returned by the executor into datasinker.
|
||||
* @param handle
|
||||
|
@ -88,6 +94,8 @@ void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
|
|||
*/
|
||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
|
||||
|
||||
int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t *pSize);
|
||||
|
||||
/**
|
||||
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
|
||||
* @param ahandle
|
||||
|
|
|
@ -171,6 +171,7 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo);
|
|||
typedef struct {
|
||||
SMonSysInfo sys;
|
||||
SMonLogs log;
|
||||
SQnodeLoad load;
|
||||
} SMonQmInfo;
|
||||
|
||||
int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo);
|
||||
|
@ -210,6 +211,10 @@ typedef struct {
|
|||
int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo);
|
||||
int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo);
|
||||
|
||||
int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo);
|
||||
int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo);
|
||||
|
||||
|
||||
typedef struct {
|
||||
const char *server;
|
||||
uint16_t port;
|
||||
|
|
|
@ -22,7 +22,7 @@ extern "C" {
|
|||
|
||||
#include "tmsgcb.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#include "executor.h"
|
||||
|
||||
enum {
|
||||
NODE_TYPE_VNODE = 1,
|
||||
|
@ -40,13 +40,19 @@ typedef struct SQWorkerCfg {
|
|||
} SQWorkerCfg;
|
||||
|
||||
typedef struct {
|
||||
uint64_t numOfStartTask;
|
||||
uint64_t numOfStopTask;
|
||||
uint64_t numOfRecvedFetch;
|
||||
uint64_t numOfSentHb;
|
||||
uint64_t numOfSentFetch;
|
||||
uint64_t numOfTaskInQueue;
|
||||
uint64_t cacheDataSize;
|
||||
|
||||
uint64_t queryProcessed;
|
||||
uint64_t cqueryProcessed;
|
||||
uint64_t fetchProcessed;
|
||||
uint64_t dropProcessed;
|
||||
uint64_t hbProcessed;
|
||||
|
||||
uint64_t numOfQueryInQueue;
|
||||
uint64_t numOfFetchInQueue;
|
||||
uint64_t timeInQueryQueue;
|
||||
uint64_t timeInFetchQueue;
|
||||
|
||||
uint64_t numOfErrors;
|
||||
} SQWorkerStat;
|
||||
|
||||
|
@ -68,7 +74,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
|||
|
||||
void qWorkerDestroy(void **qWorkerMgmt);
|
||||
|
||||
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type);
|
||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -119,6 +119,8 @@ typedef struct SHeartBeatInfo {
|
|||
struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
TdThreadMutex qnodeMutex;
|
||||
SArray* pQnodeList;
|
||||
SInstanceSummary summary;
|
||||
SList* pConnList; // STscObj linked list
|
||||
uint64_t clusterId;
|
||||
|
@ -290,7 +292,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
|||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList);
|
||||
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
|
@ -317,6 +319,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
|||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
|
||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -160,6 +160,10 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
taos_close(pTscObj);
|
||||
}
|
||||
|
||||
if (pRsp->query->pQnodeList) {
|
||||
updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
|
||||
}
|
||||
|
||||
releaseTscObj(pRsp->connKey.tscRid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,7 +117,8 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
|
|||
SAppInstInfo* p = NULL;
|
||||
if (pInst == NULL) {
|
||||
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
|
||||
p->mgmtEp = epSet;
|
||||
p->mgmtEp = epSet;
|
||||
taosThreadMutexInit(&p->qnodeMutex, NULL);
|
||||
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
|
||||
p->pAppHbMgr = appHbMgrInit(p, key);
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
|
@ -228,7 +229,61 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
||||
SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1;
|
||||
SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2;
|
||||
|
||||
if (node1->load < node2->load) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return node1->load > node2->load;
|
||||
}
|
||||
|
||||
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
|
||||
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||
if (pInfo->pQnodeList) {
|
||||
taosArrayDestroy(pInfo->pQnodeList);
|
||||
pInfo->pQnodeList = NULL;
|
||||
}
|
||||
|
||||
if (pNodeList) {
|
||||
pInfo->pQnodeList = taosArrayDup(pNodeList);
|
||||
taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
|
||||
}
|
||||
taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||
SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo;
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pInfo->qnodeMutex);
|
||||
if (pInfo->pQnodeList) {
|
||||
*pNodeList = taosArrayDup(pInfo->pQnodeList);
|
||||
}
|
||||
taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
||||
|
||||
if (NULL == *pNodeList) {
|
||||
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||
SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
|
||||
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && *pNodeList) {
|
||||
code = updateQnodeList(pInfo, *pNodeList);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
|
@ -237,14 +292,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
|||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||
SCatalog* pCatalog = NULL;
|
||||
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
|
||||
int32_t code = getQnodeList(pRequest, pNodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -369,8 +420,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
}
|
||||
|
||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
|
||||
*pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
|
||||
return getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
||||
}
|
||||
|
||||
int32_t validateSversion(SRequestObj* pRequest, void* res) {
|
||||
|
@ -456,8 +506,8 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
|||
code = execDdlQuery(pRequest, pQuery);
|
||||
break;
|
||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
||||
SArray* pNodeList = NULL;
|
||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
|
||||
if (NULL != pRes) {
|
||||
|
|
|
@ -147,12 +147,25 @@ int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) {
|
||||
if (tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr) < 0) return -1;
|
||||
if (tEncodeU64(pEncoder, pLoad->load) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) {
|
||||
if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) {
|
||||
if (tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr) < 0) return -1;
|
||||
if (tDecodeU64(pDecoder, &pLoad->load) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI8(buf, pEp->inUse);
|
||||
|
@ -304,6 +317,12 @@ static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pR
|
|||
if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pRsp->query->pQnodeList);
|
||||
if (tEncodeI32(pEncoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i);
|
||||
if (tEncodeSQueryNodeLoad(pEncoder, pLoad) < 0) return -1;
|
||||
}
|
||||
} else {
|
||||
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
|
||||
}
|
||||
|
@ -333,6 +352,15 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
|
|||
if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1;
|
||||
int32_t pQnodeNum = 0;
|
||||
if (tDecodeI32(pDecoder, &pQnodeNum) < 0) return -1;
|
||||
if (pQnodeNum > 0) {
|
||||
pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad));
|
||||
if (NULL == pRsp->query->pQnodeList) return -1;
|
||||
SQueryNodeLoad load = {0};
|
||||
if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1;
|
||||
taosArrayPush(pRsp->query->pQnodeList, &load);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t kvNum = 0;
|
||||
|
@ -898,6 +926,18 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
// mnode loads
|
||||
if (tEncodeI32(&encoder, pReq->mload.syncState) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(&encoder, pReq->qload.dnodeId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedQuery) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedCQuery) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -955,6 +995,18 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
|
||||
if (tDecodeI32(&decoder, &pReq->mload.syncState) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(&decoder, &pReq->qload.dnodeId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedQuery) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedCQuery) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
@ -1921,11 +1973,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
|
|||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pRsp->addrsList);
|
||||
int32_t num = taosArrayGetSize(pRsp->qnodeList);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SQueryNodeAddr *addr = taosArrayGet(pRsp->addrsList, i);
|
||||
if (tEncodeSQueryNodeAddr(&encoder, addr) < 0) return -1;
|
||||
SQueryNodeLoad *pLoad = taosArrayGet(pRsp->qnodeList, i);
|
||||
if (tEncodeSQueryNodeLoad(&encoder, pLoad) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -1941,15 +1993,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
|
|||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
if (NULL == pRsp->addrsList) {
|
||||
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
|
||||
if (NULL == pRsp->addrsList) return -1;
|
||||
if (NULL == pRsp->qnodeList) {
|
||||
pRsp->qnodeList = taosArrayInit(num, sizeof(SQueryNodeLoad));
|
||||
if (NULL == pRsp->qnodeList) return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SQueryNodeAddr addr = {0};
|
||||
if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1;
|
||||
taosArrayPush(pRsp->addrsList, &addr);
|
||||
SQueryNodeLoad load = {0};
|
||||
if (tDecodeSQueryNodeLoad(&decoder, &load) < 0) return -1;
|
||||
taosArrayPush(pRsp->qnodeList, &load);
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -1957,7 +2009,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->addrsList); }
|
||||
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); }
|
||||
|
||||
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SDnodeMgmt {
|
|||
SendMonitorReportFp sendMonitorReportFp;
|
||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||
GetQnodeLoadsFp getQnodeLoadsFp;
|
||||
} SDnodeMgmt;
|
||||
|
||||
// dmHandle.c
|
||||
|
@ -58,4 +59,4 @@ void dmStopWorker(SDnodeMgmt *pMgmt);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_QNODE_INT_H_*/
|
||||
#endif /*_TD_DND_QNODE_INT_H_*/
|
||||
|
|
|
@ -79,6 +79,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||
req.mload = minfo.load;
|
||||
|
||||
(*pMgmt->getQnodeLoadsFp)(&req.qload);
|
||||
|
||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||
void *pHead = rpcMallocCont(contLen);
|
||||
tSerializeSStatusReq(pHead, contLen, &req);
|
||||
|
|
|
@ -48,6 +48,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
||||
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
||||
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
|
||||
|
||||
if (dmStartWorker(pMgmt) != 0) {
|
||||
return -1;
|
||||
|
|
|
@ -20,6 +20,14 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
|
|||
SQnodeLoad qload = {0};
|
||||
qndGetLoad(pMgmt->pQnode, &qload);
|
||||
|
||||
qload.dnodeId = pMgmt->pData->dnodeId;
|
||||
|
||||
}
|
||||
|
||||
void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
|
||||
qndGetLoad(pMgmt->pQnode, pInfo);
|
||||
|
||||
pInfo->dnodeId = pMgmt->pData->dnodeId;
|
||||
}
|
||||
|
||||
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
|
|
|
@ -104,7 +104,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
|
||||
dTrace("msg:%p, get from vnode-write queue", pMsg);
|
||||
if (taosArrayPush(pArray, &pMsg) == NULL) {
|
||||
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
dTrace("msg:%p, failed to push to array since %s", pMsg, terrstr());
|
||||
vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,6 +168,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
|||
void dmSendMonitorReport();
|
||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
||||
void dmGetQnodeLoads(SQnodeLoad *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
|||
|
||||
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
|
||||
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
||||
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -178,6 +178,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
|||
.sendMonitorReportFp = dmSendMonitorReport,
|
||||
.getVnodeLoadsFp = dmGetVnodeLoads,
|
||||
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||
.getQnodeLoadsFp = dmGetQnodeLoads,
|
||||
};
|
||||
|
||||
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
||||
|
|
|
@ -170,3 +170,17 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
|
|||
dmReleaseWrapper(pWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
void dmGetQnodeLoads(SQnodeLoad *pInfo) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
|
||||
if (dmMarkWrapper(pWrapper) == 0) {
|
||||
if (tsMultiProcess) {
|
||||
dmSendLocalRecv(pDnode, TDMT_MON_QM_LOAD, tDeserializeSQnodeLoad, pInfo);
|
||||
} else if (pWrapper->pMgmt != NULL) {
|
||||
qmGetQnodeLoads(pWrapper->pMgmt, pInfo);
|
||||
}
|
||||
dmReleaseWrapper(pWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "qnode.h"
|
||||
#include "monitor.h"
|
||||
#include "sync.h"
|
||||
#include "wal.h"
|
||||
|
@ -92,6 +93,7 @@ typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
|||
typedef void (*SendMonitorReportFp)();
|
||||
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
|
||||
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
|
||||
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
|
@ -118,6 +120,7 @@ typedef struct {
|
|||
SendMonitorReportFp sendMonitorReportFp;
|
||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||
GetQnodeLoadsFp getQnodeLoadsFp;
|
||||
} SMgmtInputOpt;
|
||||
|
||||
typedef struct {
|
||||
|
@ -180,4 +183,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DM_INT_H_*/
|
||||
#endif /*_TD_DM_INT_H_*/
|
||||
|
|
|
@ -219,6 +219,7 @@ typedef struct {
|
|||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
SDnodeObj* pDnode;
|
||||
SQnodeLoad load;
|
||||
} SQnodeObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -22,9 +22,15 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define QNODE_LOAD_VALUE(pQnode) (pQnode ? (pQnode->load.numOfQueryInQueue + pQnode->load.numOfFetchInQueue) : 0)
|
||||
|
||||
int32_t mndInitQnode(SMnode *pMnode);
|
||||
void mndCleanupQnode(SMnode *pMnode);
|
||||
|
||||
SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId);
|
||||
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj);
|
||||
int32_t mndCreateQnodeList(SMnode *pMnode, SArray** pList, int32_t limit);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "mndDnode.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndQnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
|
@ -388,6 +389,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
mndReleaseMnode(pMnode, pObj);
|
||||
}
|
||||
|
||||
SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
|
||||
if (pQnode != NULL) {
|
||||
pQnode->load = statusReq.qload;
|
||||
mndReleaseQnode(pMnode, pQnode);
|
||||
}
|
||||
|
||||
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndQnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndUser.h"
|
||||
|
@ -382,6 +383,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
|
||||
rspBasic->onlineDnodes = 1; // TODO
|
||||
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
|
||||
|
||||
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
|
||||
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
||||
hbRsp.query = rspBasic;
|
||||
|
|
|
@ -60,7 +60,7 @@ int32_t mndInitQnode(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupQnode(SMnode *pMnode) {}
|
||||
|
||||
static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
|
||||
SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
|
||||
SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId);
|
||||
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
|
||||
|
@ -68,7 +68,7 @@ static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
|
|||
return pObj;
|
||||
}
|
||||
|
||||
static void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) {
|
||||
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
|
@ -429,12 +429,49 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndCreateQnodeList(SMnode *pMnode, SArray** pList, int32_t limit) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SQnodeObj *pObj = NULL;
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
SArray* qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
|
||||
if (NULL == qnodeList) {
|
||||
mError("failed to alloc epSet while process qnode list req");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SQueryNodeLoad nodeLoad = {0};
|
||||
nodeLoad.addr.nodeId = QNODE_HANDLE;
|
||||
nodeLoad.addr.epSet.numOfEps = 1;
|
||||
tstrncpy(nodeLoad.addr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
nodeLoad.addr.epSet.eps[0].port = pObj->pDnode->port;
|
||||
nodeLoad.load = QNODE_LOAD_VALUE(pObj);
|
||||
|
||||
(void)taosArrayPush(qnodeList, &nodeLoad);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pObj);
|
||||
|
||||
if (limit > 0 && numOfRows >= limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*pList = qnodeList;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
|
||||
int32_t code = -1;
|
||||
int32_t numOfRows = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SQnodeObj *pObj = NULL;
|
||||
SQnodeListReq qlistReq = {0};
|
||||
SQnodeListRsp qlistRsp = {0};
|
||||
|
||||
|
@ -444,34 +481,10 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
qlistRsp.addrsList = taosArrayInit(5, sizeof(SQueryNodeAddr));
|
||||
if (NULL == qlistRsp.addrsList) {
|
||||
mError("failed to alloc epSet while process qnode list req");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SQueryNodeAddr nodeAddr = {0};
|
||||
nodeAddr.nodeId = QNODE_HANDLE;
|
||||
nodeAddr.epSet.numOfEps = 1;
|
||||
tstrncpy(nodeAddr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
nodeAddr.epSet.eps[0].port = pObj->pDnode->port;
|
||||
|
||||
(void)taosArrayPush(qlistRsp.addrsList, &nodeAddr);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pObj);
|
||||
|
||||
if (qlistReq.rowNum > 0 && numOfRows >= qlistReq.rowNum) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp);
|
||||
void *pRsp = rpcMallocCont(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
|
|
|
@ -41,12 +41,24 @@ void qndClose(SQnode *pQnode) {
|
|||
}
|
||||
|
||||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
||||
SMsgCb* pCb = &pQnode->msgCb;
|
||||
SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
|
||||
SQWorkerStat stat = {0};
|
||||
|
||||
pLoad->numOfQueryInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, QUERY_QUEUE);
|
||||
pLoad->numOfFetchInQueue = pCb->qsizeFp(pCb->mgmt, pQnode->qndId, FETCH_QUEUE);
|
||||
pLoad->waitTimeInQueryQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, QUERY_QUEUE);
|
||||
pLoad->waitTimeInFetchQUeue = qWorkerGetWaitTimeInQueue(pQnode->pQuery, FETCH_QUEUE);
|
||||
int32_t code = qWorkerGetStat(&handle, pQnode->pQuery, &stat);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pLoad->numOfQueryInQueue = stat.numOfQueryInQueue;
|
||||
pLoad->numOfFetchInQueue = stat.numOfFetchInQueue;
|
||||
pLoad->timeInQueryQueue = stat.timeInQueryQueue;
|
||||
pLoad->timeInFetchQueue = stat.timeInFetchQueue;
|
||||
pLoad->cacheDataSize = stat.cacheDataSize;
|
||||
pLoad->numOfProcessedQuery = stat.queryProcessed;
|
||||
pLoad->numOfProcessedCQuery = stat.cqueryProcessed;
|
||||
pLoad->numOfProcessedFetch = stat.fetchProcessed;
|
||||
pLoad->numOfProcessedDrop = stat.dropProcessed;
|
||||
pLoad->numOfProcessedHb = stat.hbProcessed;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -342,16 +342,16 @@ typedef struct SCtgOperation {
|
|||
ctgOpFunc func;
|
||||
} SCtgOperation;
|
||||
|
||||
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
|
||||
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
|
||||
#define CTG_QUEUE_INC() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
|
||||
#define CTG_QUEUE_DEC() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
|
||||
|
||||
#define CTG_STAT_ADD(_item, _n) atomic_add_fetch_64(&(_item), _n)
|
||||
#define CTG_STAT_SUB(_item, _n) atomic_sub_fetch_64(&(_item), _n)
|
||||
#define CTG_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
|
||||
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
|
||||
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
|
||||
|
||||
#define CTG_RUNTIME_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.runtime.item, n))
|
||||
#define CTG_CACHE_STAT_ADD(item, n) (CTG_STAT_ADD(gCtgMgmt.stat.cache.item, n))
|
||||
#define CTG_CACHE_STAT_SUB(item, n) (CTG_STAT_SUB(gCtgMgmt.stat.cache.item, n))
|
||||
#define CTG_RT_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.runtime.item, n))
|
||||
#define CTG_CACHE_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.cache.item, n))
|
||||
#define CTG_CACHE_STAT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.stat.cache.item, n))
|
||||
|
||||
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
|
||||
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
|
||||
|
|
|
@ -558,7 +558,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
|||
|
||||
*catalogHandle = clusterCtg;
|
||||
|
||||
CTG_CACHE_STAT_ADD(clusterNum, 1);
|
||||
CTG_CACHE_STAT_INC(clusterNum, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -579,7 +579,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
|
|||
return;
|
||||
}
|
||||
|
||||
CTG_CACHE_STAT_SUB(clusterNum, 1);
|
||||
CTG_CACHE_STAT_DEC(clusterNum, 1);
|
||||
|
||||
uint64_t clusterId = pCtg->clusterId;
|
||||
|
||||
|
@ -990,7 +990,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
|||
}
|
||||
|
||||
if (pReq->qNodeRequired) {
|
||||
pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr));
|
||||
pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeLoad));
|
||||
CTG_ERR_JRET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), pRsp->pQnodeList, NULL));
|
||||
}
|
||||
|
||||
|
|
|
@ -178,7 +178,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
|
|||
|
||||
*pCache = dbCache;
|
||||
|
||||
CTG_CACHE_STAT_ADD(vgHitNum, 1);
|
||||
CTG_CACHE_STAT_INC(vgHitNum, 1);
|
||||
|
||||
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
|
||||
|
||||
|
@ -192,7 +192,7 @@ _return:
|
|||
|
||||
*pCache = NULL;
|
||||
|
||||
CTG_CACHE_STAT_ADD(vgMissNum, 1);
|
||||
CTG_CACHE_STAT_INC(vgMissNum, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
|
|||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, ctx->pName->tname);
|
||||
|
||||
CTG_CACHE_STAT_ADD(tblHitNum, 1);
|
||||
CTG_CACHE_STAT_INC(tblHitNum, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -312,7 +312,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
|
|||
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
CTG_CACHE_STAT_ADD(tblHitNum, 1);
|
||||
CTG_CACHE_STAT_INC(tblHitNum, 1);
|
||||
|
||||
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, ctx->pName->tname);
|
||||
|
||||
|
@ -323,7 +323,7 @@ _return:
|
|||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
|
||||
CTG_CACHE_STAT_ADD(tblMissNum, 1);
|
||||
CTG_CACHE_STAT_INC(tblMissNum, 1);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -462,7 +462,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam
|
|||
*inCache = true;
|
||||
|
||||
ctgDebug("Got user from cache, user:%s", user);
|
||||
CTG_CACHE_STAT_ADD(userHitNum, 1);
|
||||
CTG_CACHE_STAT_INC(userHitNum, 1);
|
||||
|
||||
if (pUser->superUser) {
|
||||
*pass = true;
|
||||
|
@ -491,7 +491,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam
|
|||
_return:
|
||||
|
||||
*inCache = false;
|
||||
CTG_CACHE_STAT_ADD(userMissNum, 1);
|
||||
CTG_CACHE_STAT_INC(userMissNum, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -521,7 +521,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
|
|||
SCtgQNode *node = gCtgMgmt.queue.head->next;
|
||||
gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;
|
||||
|
||||
CTG_QUEUE_SUB();
|
||||
CTG_QUEUE_DEC();
|
||||
|
||||
taosMemoryFreeClear(orig);
|
||||
|
||||
|
@ -545,8 +545,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
|||
gCtgMgmt.queue.tail = node;
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||
|
||||
CTG_QUEUE_ADD();
|
||||
CTG_RUNTIME_STAT_ADD(qNum, 1);
|
||||
CTG_QUEUE_INC();
|
||||
CTG_RT_STAT_INC(qNum, 1);
|
||||
|
||||
tsem_post(&gCtgMgmt.queue.reqSem);
|
||||
|
||||
|
@ -988,7 +988,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
|||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
CTG_CACHE_STAT_ADD(dbNum, 1);
|
||||
CTG_CACHE_STAT_INC(dbNum, 1);
|
||||
|
||||
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
|
||||
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||
|
@ -1048,7 +1048,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
|
||||
}
|
||||
|
||||
CTG_CACHE_STAT_SUB(dbNum, 1);
|
||||
CTG_CACHE_STAT_DEC(dbNum, 1);
|
||||
|
||||
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
|
||||
|
||||
|
@ -1187,7 +1187,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
|
||||
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
|
||||
} else {
|
||||
CTG_CACHE_STAT_SUB(stblNum, 1);
|
||||
CTG_CACHE_STAT_DEC(stblNum, 1);
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
|
||||
|
||||
|
@ -1214,7 +1214,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
}
|
||||
|
||||
if (NULL == orig) {
|
||||
CTG_CACHE_STAT_ADD(tblNum, 1);
|
||||
CTG_CACHE_STAT_INC(tblNum, 1);
|
||||
}
|
||||
|
||||
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
|
||||
|
@ -1233,7 +1233,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
CTG_CACHE_STAT_ADD(stblNum, 1);
|
||||
CTG_CACHE_STAT_INC(stblNum, 1);
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
|
||||
|
||||
|
@ -1371,14 +1371,14 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
|
|||
if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
|
||||
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
||||
} else {
|
||||
CTG_CACHE_STAT_SUB(stblNum, 1);
|
||||
CTG_CACHE_STAT_DEC(stblNum, 1);
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {
|
||||
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
||||
} else {
|
||||
CTG_CACHE_STAT_SUB(tblNum, 1);
|
||||
CTG_CACHE_STAT_DEC(tblNum, 1);
|
||||
}
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
|
||||
|
@ -1419,7 +1419,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
|
|||
ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
} else {
|
||||
CTG_CACHE_STAT_SUB(tblNum, 1);
|
||||
CTG_CACHE_STAT_DEC(tblNum, 1);
|
||||
}
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
|
||||
|
@ -1578,7 +1578,7 @@ void* ctgUpdateThreadFunc(void* param) {
|
|||
tsem_post(&gCtgMgmt.queue.rspSem);
|
||||
}
|
||||
|
||||
CTG_RUNTIME_STAT_ADD(qDoneNum, 1);
|
||||
CTG_RT_STAT_INC(qDoneNum, 1);
|
||||
|
||||
ctgdShowClusterCache(pCtg);
|
||||
}
|
||||
|
|
|
@ -275,7 +275,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask) {
|
||||
void* pOut = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||
void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
if (NULL == pOut) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
|
|||
int32_t stblNum = taosHashGetSize(cache->stbCache);
|
||||
taosHashCleanup(cache->stbCache);
|
||||
cache->stbCache = NULL;
|
||||
CTG_CACHE_STAT_SUB(stblNum, stblNum);
|
||||
CTG_CACHE_STAT_DEC(stblNum, stblNum);
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
||||
|
||||
|
@ -94,7 +94,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
|
|||
int32_t tblNum = taosHashGetSize(cache->metaCache);
|
||||
taosHashCleanup(cache->metaCache);
|
||||
cache->metaCache = NULL;
|
||||
CTG_CACHE_STAT_SUB(tblNum, tblNum);
|
||||
CTG_CACHE_STAT_DEC(tblNum, tblNum);
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
|
||||
taosHashCleanup(pCtg->dbCache);
|
||||
|
||||
CTG_CACHE_STAT_SUB(dbNum, dbNum);
|
||||
CTG_CACHE_STAT_DEC(dbNum, dbNum);
|
||||
}
|
||||
|
||||
if (pCtg->userCache) {
|
||||
|
@ -162,7 +162,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
|
||||
taosHashCleanup(pCtg->userCache);
|
||||
|
||||
CTG_CACHE_STAT_SUB(userNum, userNum);
|
||||
CTG_CACHE_STAT_DEC(userNum, userNum);
|
||||
}
|
||||
|
||||
taosMemoryFree(pCtg);
|
||||
|
|
|
@ -37,6 +37,7 @@ typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
|
|||
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
|
||||
|
||||
typedef struct SDataSinkHandle {
|
||||
FPutDataBlock fPut;
|
||||
|
@ -44,6 +45,7 @@ typedef struct SDataSinkHandle {
|
|||
FGetDataLength fGetLen;
|
||||
FGetDataBlock fGetData;
|
||||
FDestroyDataSinker fDestroy;
|
||||
FGetCacheSize fGetCacheSize;
|
||||
} SDataSinkHandle;
|
||||
|
||||
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
#include "tglobal.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
extern SDataSinkStat gDataSinkStat;
|
||||
|
||||
typedef struct SDataDispatchBuf {
|
||||
int32_t useSize;
|
||||
int32_t allocSize;
|
||||
|
@ -45,6 +47,7 @@ typedef struct SDataDispatchHandle {
|
|||
int32_t status;
|
||||
bool queryEnd;
|
||||
uint64_t useconds;
|
||||
uint64_t cachedSize;
|
||||
TdThreadMutex mutex;
|
||||
} SDataDispatchHandle;
|
||||
|
||||
|
@ -71,7 +74,7 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
|||
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||
// recorded in the first segment, next to the struct header
|
||||
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
|
||||
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||
|
@ -84,6 +87,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
|
|||
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
||||
|
||||
pBuf->useSize += pEntry->dataLen;
|
||||
|
||||
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
}
|
||||
|
||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
|
@ -156,6 +162,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
|
|||
taosFreeQitem(pBuf);
|
||||
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
||||
*pQueryEnd = pDispatcher->queryEnd;
|
||||
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
|
||||
}
|
||||
|
||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||
|
@ -173,6 +180,10 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->numOfRows = pEntry->numOfRows;
|
||||
pOutput->numOfCols = pEntry->numOfCols;
|
||||
pOutput->compressed = pEntry->compressed;
|
||||
|
||||
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
|
||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
|
||||
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
|
||||
pOutput->bufStatus = updateStatus(pDispatcher);
|
||||
taosThreadMutexLock(&pDispatcher->mutex);
|
||||
|
@ -180,11 +191,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->useconds = pDispatcher->useconds;
|
||||
pOutput->precision = pDispatcher->pSchema->precision;
|
||||
taosThreadMutexUnlock(&pDispatcher->mutex);
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
|
||||
taosMemoryFreeClear(pDispatcher->nextOutput.pData);
|
||||
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
|
@ -197,6 +211,13 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
|
||||
*size = atomic_load_64(&pDispatcher->cachedSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
|
||||
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
|
||||
if (NULL == dispatcher) {
|
||||
|
@ -208,6 +229,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
|
|||
dispatcher->sink.fGetLen = getDataLength;
|
||||
dispatcher->sink.fGetData = getDataBlock;
|
||||
dispatcher->sink.fDestroy = destroyDataSinker;
|
||||
dispatcher->sink.fGetCacheSize = getCacheSize;
|
||||
dispatcher->pManager = pManager;
|
||||
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
|
||||
dispatcher->status = DS_BUF_EMPTY;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "planner.h"
|
||||
|
||||
static SDataSinkManager gDataSinkManager = {0};
|
||||
SDataSinkStat gDataSinkStat = {0};
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
|
||||
gDataSinkManager.cfg = *cfg;
|
||||
|
@ -26,6 +27,13 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
|
|||
return 0; // to avoid compiler eror
|
||||
}
|
||||
|
||||
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) {
|
||||
pStat->cachedSize = atomic_load_64(&gDataSinkStat.cachedSize);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) {
|
||||
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
||||
|
@ -53,6 +61,12 @@ int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
|||
return pHandleImpl->fGetData(pHandleImpl, pOutput);
|
||||
}
|
||||
|
||||
int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t *pSize) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
return pHandleImpl->fGetCacheSize(pHandleImpl, pSize);
|
||||
}
|
||||
|
||||
|
||||
void dsScheduleProcess(void* ahandle, void* pItem) {
|
||||
// todo
|
||||
}
|
||||
|
|
|
@ -303,7 +303,9 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_
|
|||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
|
||||
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
|
||||
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
|
||||
|
||||
int32_t functionId = pExpr->pExpr->_function.functionId;
|
||||
|
||||
|
|
|
@ -556,4 +556,50 @@ int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInf
|
|||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedQuery) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedCQuery) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->timeInQueryQueue) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->timeInFetchQueue) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedQuery) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedCQuery) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->timeInQueryQueue) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pInfo->timeInFetchQueue) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -468,6 +468,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
||||
}
|
||||
|
@ -489,7 +490,8 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
|
||||
}
|
||||
if (pCxt->pExecNodeList) {
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
}
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
||||
|
@ -524,10 +526,11 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
pScan->accountId = pCxt->pPlanCxt->acctId;
|
||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = { .addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
} else {
|
||||
SQueryNodeAddr addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet};
|
||||
taosArrayPush(pCxt->pExecNodeList, &addr);
|
||||
SQueryNodeLoad node = { .addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
}
|
||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
|
@ -1248,7 +1251,8 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
|
|||
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
|
||||
pSubplan->msgType = pModif->msgType;
|
||||
pSubplan->execNode.epSet = pModif->pVgDataBlocks->vg.epSet;
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
code = createDataInserter(pCxt, pModif->pVgDataBlocks, &pSubplan->pDataSink);
|
||||
} else {
|
||||
pSubplan->msgType = TDMT_VND_QUERY;
|
||||
|
|
|
@ -373,7 +373,7 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
|||
return code;
|
||||
}
|
||||
|
||||
out.addrsList = (SArray *)output;
|
||||
out.qnodeList = (SArray *)output;
|
||||
if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
|
||||
qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
|
|
|
@ -145,13 +145,30 @@ typedef struct SQWSchStatus {
|
|||
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
|
||||
} SQWSchStatus;
|
||||
|
||||
typedef struct SQWWaitTimeStat {
|
||||
typedef struct SQWTimeInQ {
|
||||
uint64_t num;
|
||||
uint64_t total;
|
||||
} SQWWaitTimeStat;
|
||||
} SQWTimeInQ;
|
||||
|
||||
typedef struct SQWMsgStat {
|
||||
SQWTimeInQ waitTime[2];
|
||||
uint64_t queryProcessed;
|
||||
uint64_t cqueryProcessed;
|
||||
uint64_t fetchProcessed;
|
||||
uint64_t fetchRspProcessed;
|
||||
uint64_t cancelProcessed;
|
||||
uint64_t dropProcessed;
|
||||
uint64_t hbProcessed;
|
||||
} SQWMsgStat;
|
||||
|
||||
typedef struct SQWRTStat {
|
||||
uint64_t startTaskNum;
|
||||
uint64_t stopTaskNum;
|
||||
} SQWRTStat;
|
||||
|
||||
typedef struct SQWStat {
|
||||
SQWWaitTimeStat msgWait[2];
|
||||
SQWMsgStat msgStat;
|
||||
SQWRTStat rtStat;
|
||||
} SQWStat;
|
||||
|
||||
// Qnode/Vnode level task management
|
||||
|
@ -182,10 +199,13 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_IDS() sId, qId, tId, rId
|
||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||
|
||||
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
|
||||
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
|
||||
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
|
||||
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
|
||||
|
||||
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
|
||||
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
|
||||
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
|
||||
#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
|
||||
#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
|
||||
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
|
||||
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
|
||||
|
||||
|
@ -332,8 +352,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF);
|
|||
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx);
|
||||
int32_t qwOpenRef(void);
|
||||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
|
||||
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
|
||||
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type);
|
||||
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
|
||||
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||
|
|
|
@ -257,7 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -297,7 +298,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
SQWTaskCtx * handles = NULL;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -328,7 +330,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
SResFetchReq *msg = pMsg->pCont;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -357,7 +360,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
|
||||
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
if (mgmt) {
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1);
|
||||
}
|
||||
|
||||
qProcessFetchRsp(NULL, pMsg, NULL);
|
||||
pMsg->pCont = NULL;
|
||||
|
@ -373,7 +379,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid task cancel msg");
|
||||
|
@ -411,7 +418,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
|||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
|
@ -452,7 +460,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
|||
SSchedulerHbReq req = {0};
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateWaitTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
|
||||
|
||||
if (NULL == pMsg->pCont) {
|
||||
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
|
||||
|
|
|
@ -499,7 +499,7 @@ int32_t qwOpenRef(void) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
|
||||
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
|
||||
if (ts <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -507,12 +507,12 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
|
|||
int64_t duration = taosGetTimestampUs() - ts;
|
||||
switch (type) {
|
||||
case QUERY_QUEUE:
|
||||
++mgmt->stat.msgWait[0].num;
|
||||
mgmt->stat.msgWait[0].total += duration;
|
||||
++mgmt->stat.msgStat.waitTime[0].num;
|
||||
mgmt->stat.msgStat.waitTime[0].total += duration;
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
++mgmt->stat.msgWait[1].num;
|
||||
mgmt->stat.msgWait[1].total += duration;
|
||||
++mgmt->stat.msgStat.waitTime[1].num;
|
||||
mgmt->stat.msgStat.waitTime[1].total += duration;
|
||||
break;
|
||||
default:
|
||||
qError("unsupported queue type %d", type);
|
||||
|
@ -522,19 +522,20 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t qwGetWaitTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
||||
SQWWaitTimeStat *pStat = NULL;
|
||||
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
||||
SQWTimeInQ *pStat = NULL;
|
||||
switch (type) {
|
||||
case QUERY_QUEUE:
|
||||
pStat = &mgmt->stat.msgWait[0];
|
||||
pStat = &mgmt->stat.msgStat.waitTime[0];
|
||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
||||
case FETCH_QUEUE:
|
||||
pStat = &mgmt->stat.msgWait[1];
|
||||
pStat = &mgmt->stat.msgStat.waitTime[1];
|
||||
return pStat->num ? (pStat->total/pStat->num) : 0;
|
||||
default:
|
||||
qError("unsupported queue type %d", type);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
#include "qworker.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
|
@ -8,6 +7,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
#include "qworker.h"
|
||||
|
||||
SQWorkerMgmt gQwMgmt = {
|
||||
.lock = 0,
|
||||
|
@ -954,8 +954,29 @@ void qWorkerDestroy(void **qWorkerMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type) {
|
||||
return qwGetWaitTimeInQueue((SQWorker *)qWorkerMgmt, type);
|
||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat) {
|
||||
if (NULL == handle || NULL == qWorkerMgmt || NULL == pStat) {
|
||||
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SDataSinkStat sinkStat = {0};
|
||||
|
||||
dsDataSinkGetCacheSize(&sinkStat);
|
||||
pStat->cacheDataSize = sinkStat.cachedSize;
|
||||
|
||||
pStat->queryProcessed = QW_STAT_GET(mgmt->stat.msgStat.queryProcessed);
|
||||
pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed);
|
||||
pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed);
|
||||
pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed);
|
||||
pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed);
|
||||
|
||||
pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE);
|
||||
pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE);
|
||||
pStat->timeInQueryQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, QUERY_QUEUE);
|
||||
pStat->timeInFetchQueue = qwGetTimeInQueue((SQWorker *)qWorkerMgmt, FETCH_QUEUE);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
|
|||
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp);
|
||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
|
||||
void schProcessOnDataFetched(SSchJob *job);
|
||||
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode);
|
||||
void schFreeRpcCtxVal(const void *arg);
|
||||
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
|
||||
|
@ -314,12 +314,11 @@ int32_t schCancelJob(SSchJob *pJob);
|
|||
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
|
||||
uint64_t schGenTaskId(void);
|
||||
void schCloseJobRef(void);
|
||||
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
||||
int64_t startTs, SSchResInfo *pRes);
|
||||
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
||||
int64_t startTs, SSchResInfo *pRes);
|
||||
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
|
||||
int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes);
|
||||
int32_t schFetchRows(SSchJob *pJob);
|
||||
int32_t schAsyncFetchRows(SSchJob *pJob);
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -342,6 +342,36 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) {
|
||||
if (NULL == pTask->execNodes) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->execNodes);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i);
|
||||
if (pNode->handle == handle) {
|
||||
taosArrayRemove(pTask->execNodes, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) {
|
||||
SCH_SET_TASK_HANDLE(pTask, handle);
|
||||
|
||||
schUpdateTaskExecNodeHandle(pTask, handle, rspCode);
|
||||
|
||||
if (msgType == TDMT_SCH_LINK_BROKEN) {
|
||||
schDropTaskExecNode(pJob, pTask, handle);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schRecordQueryDataSrc(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -469,6 +499,34 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
||||
int32_t addNum = 0;
|
||||
int32_t nodeNum = 0;
|
||||
|
||||
if (pJob->nodeList) {
|
||||
nodeNum = taosArrayGetSize(pJob->nodeList);
|
||||
|
||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
|
||||
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
++addNum;
|
||||
}
|
||||
}
|
||||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (NULL != pTask->candidateAddrs) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -492,27 +550,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t addNum = 0;
|
||||
int32_t nodeNum = 0;
|
||||
if (pJob->nodeList) {
|
||||
nodeNum = taosArrayGetSize(pJob->nodeList);
|
||||
|
||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
|
||||
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
++addNum;
|
||||
}
|
||||
}
|
||||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
|
||||
|
||||
/*
|
||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||
|
@ -1001,19 +1039,19 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
*/
|
||||
|
||||
for (int32_t i = 0; i < parentNum; ++i) {
|
||||
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
|
||||
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &par->lock);
|
||||
SCH_LOCK(SCH_WRITE, &parent->lock);
|
||||
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
||||
.taskId = pTask->taskId,
|
||||
.schedId = schMgmt.sId,
|
||||
.addr = pTask->succeedAddr};
|
||||
qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &par->lock);
|
||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
||||
|
||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, par)) {
|
||||
SCH_ERR_RET(schLaunchTask(pJob, par));
|
||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
||||
SCH_ERR_RET(schLaunchTask(pJob, parent));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1087,7 +1125,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
|
||||
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
|
||||
int32_t s = taosHashGetSize(pTaskList);
|
||||
if (s <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1103,6 +1141,21 @@ int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
|
||||
schGetTaskFromList(pJob->execTasks, taskId, pTask);
|
||||
if (NULL == *pTask) {
|
||||
schGetTaskFromList(pJob->succTasks, taskId, pTask);
|
||||
|
||||
if (NULL == *pTask) {
|
||||
SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
|
||||
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
|
||||
taosArrayGetSize(pTask->execNodes) <= 0) {
|
||||
|
|
|
@ -358,27 +358,11 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
||||
}
|
||||
|
||||
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
|
||||
if (NULL == pTask) {
|
||||
if (TDMT_VND_EXPLAIN_RSP == msgType) {
|
||||
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
|
||||
} else {
|
||||
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
|
||||
pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pTask) {
|
||||
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
|
||||
pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
|
||||
|
||||
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
|
||||
|
||||
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
|
||||
schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
|
||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode));
|
||||
|
||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
|
|||
|
||||
if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
|
||||
qDebug("job not initialized or not executable job, refId:%" PRIx64, job);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
|
||||
|
@ -155,7 +155,11 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
|
|||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_return:
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t scheduleCancelJob(int64_t job) {
|
||||
|
|
Loading…
Reference in New Issue