Merge pull request #10032 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
173f120007
|
@ -1,6 +1,10 @@
|
|||
#ifndef TDENGINE_TEP_H
|
||||
#define TDENGINE_TEP_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "os.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
|
@ -9,10 +13,16 @@ typedef struct SCorEpSet {
|
|||
SEpSet epSet;
|
||||
} SCorEpSet;
|
||||
|
||||
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
|
||||
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
|
||||
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
|
||||
|
||||
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
|
||||
|
||||
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
|
||||
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
|
||||
SEpSet getEpSet_s(SCorEpSet *pEpSet);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_TEP_H
|
||||
|
|
|
@ -154,10 +154,10 @@ typedef struct {
|
|||
#pragma pack(push, 1)
|
||||
|
||||
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
|
||||
typedef struct {
|
||||
typedef struct SEp {
|
||||
char fqdn[TSDB_FQDN_LEN];
|
||||
uint16_t port;
|
||||
} SEpAddr;
|
||||
} SEp;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
|
@ -266,8 +266,7 @@ typedef struct {
|
|||
typedef struct SEpSet {
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
uint16_t port[TSDB_MAX_REPLICA];
|
||||
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
|
||||
SEp eps[TSDB_MAX_REPLICA];
|
||||
} SEpSet;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
|
||||
|
@ -275,8 +274,8 @@ static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
|
|||
tlen += taosEncodeFixedI8(buf, pEp->inUse);
|
||||
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
|
||||
tlen += taosEncodeFixedU16(buf, pEp->port[i]);
|
||||
tlen += taosEncodeString(buf, pEp->fqdn[i]);
|
||||
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
|
||||
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -285,8 +284,8 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
|
|||
buf = taosDecodeFixedI8(buf, &pEp->inUse);
|
||||
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
|
||||
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
|
||||
buf = taosDecodeFixedU16(buf, &pEp->port[i]);
|
||||
buf = taosDecodeStringTo(buf, pEp->fqdn[i]);
|
||||
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
|
||||
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
@ -617,8 +616,7 @@ typedef struct {
|
|||
int32_t id;
|
||||
int8_t isMnode;
|
||||
int8_t align;
|
||||
uint16_t port;
|
||||
char fqdn[TSDB_FQDN_LEN];
|
||||
SEp ep;
|
||||
} SDnodeEp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -691,24 +689,17 @@ typedef struct {
|
|||
char tableNames[];
|
||||
} SMultiTableInfoReq;
|
||||
|
||||
// todo refactor
|
||||
typedef struct SVgroupInfo {
|
||||
int32_t vgId;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
SEpSet epset;
|
||||
} SVgroupInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int8_t numOfEps;
|
||||
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
} SVgroupMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVgroups;
|
||||
SVgroupMsg vgroups[];
|
||||
int32_t numOfVgroups;
|
||||
SVgroupInfo vgroups[];
|
||||
} SVgroupsInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -128,20 +128,9 @@ typedef struct SMsgSendInfo {
|
|||
|
||||
typedef struct SQueryNodeAddr {
|
||||
int32_t nodeId; // vgId or qnodeId
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
SEpSet epset;
|
||||
} SQueryNodeAddr;
|
||||
|
||||
static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) {
|
||||
pEpSet->inUse = pAddr->inUse;
|
||||
pEpSet->numOfEps = pAddr->numOfEps;
|
||||
for (int j = 0; j < TSDB_MAX_REPLICA; j++) {
|
||||
pEpSet->port[j] = pAddr->epAddr[j].port;
|
||||
memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
|
|
|
@ -20,21 +20,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
//typedef struct SEpAddr {
|
||||
// char fqdn[TSDB_FQDN_LEN];
|
||||
// uint16_t port;
|
||||
//} SEpAddr;
|
||||
//
|
||||
//typedef struct SVgroup {
|
||||
// int32_t vgId;
|
||||
// int8_t numOfEps;
|
||||
// SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
//} SVgroup;
|
||||
//
|
||||
//typedef struct SVgroupsInfo {
|
||||
// int32_t numOfVgroups;
|
||||
// SVgroup vgroups[];
|
||||
//} SVgroupsInfo;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
}
|
||||
|
||||
if (port) {
|
||||
epSet.epSet.port[0] = port;
|
||||
epSet.epSet.eps[0].port = port;
|
||||
}
|
||||
} else {
|
||||
if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
|
||||
|
@ -806,7 +806,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
|
|||
return -1;
|
||||
}
|
||||
|
||||
taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
|
||||
taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
|
||||
mgmtEpSet->numOfEps++;
|
||||
}
|
||||
|
||||
|
@ -816,7 +816,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
|
|||
return -1;
|
||||
}
|
||||
|
||||
taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
|
||||
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
|
||||
mgmtEpSet->numOfEps++;
|
||||
}
|
||||
|
||||
|
@ -1021,14 +1021,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
||||
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
|
||||
|
||||
epSet.numOfEps = pVgroupInfo->numOfEps;
|
||||
epSet.inUse = pVgroupInfo->inUse;
|
||||
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
|
||||
epSet.port[i] = pVgroupInfo->epAddr[i].port;
|
||||
}
|
||||
|
||||
epSet = pVgroupInfo->epset;
|
||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
||||
pRequest->type = TDMT_VND_SHOW_TABLES;
|
||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
||||
|
@ -1045,14 +1038,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
pRequest->body.requestMsg.pData = pShowReq;
|
||||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
|
||||
epSet.numOfEps = pVgroupInfo->numOfEps;
|
||||
epSet.inUse = pVgroupInfo->inUse;
|
||||
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
|
||||
epSet.port[i] = pVgroupInfo->epAddr[i].port;
|
||||
}
|
||||
epSet = pVgroupInfo->epset;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
|
|
|
@ -53,7 +53,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
assert(pConnect->epSet.numOfEps > 0);
|
||||
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
|
||||
pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]);
|
||||
pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port);
|
||||
}
|
||||
|
||||
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
|
||||
|
@ -61,7 +61,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
|
||||
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.fqdn[i], pConnect->epSet.port[i], pTscObj->id);
|
||||
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.eps[i].fqdn,
|
||||
pConnect->epSet.eps[i].port, pTscObj->id);
|
||||
}
|
||||
|
||||
pTscObj->connId = pConnect->connId;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2,32 +2,43 @@
|
|||
#include "tglobal.h"
|
||||
#include "tlockfree.h"
|
||||
|
||||
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
|
||||
*port = 0;
|
||||
strcpy(fqdn, ep);
|
||||
int taosGetFqdnPortFromEp(const char *ep, SEp* pEp) {
|
||||
pEp->port = 0;
|
||||
strcpy(pEp->fqdn, ep);
|
||||
|
||||
char *temp = strchr(fqdn, ':');
|
||||
char *temp = strchr(pEp->fqdn, ':');
|
||||
if (temp) {
|
||||
*temp = 0;
|
||||
*port = atoi(temp+1);
|
||||
pEp->port = atoi(temp+1);
|
||||
}
|
||||
|
||||
if (*port == 0) {
|
||||
*port = tsServerPort;
|
||||
if (pEp->port == 0) {
|
||||
pEp->port = tsServerPort;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void addEpIntoEpSet(SEpSet *pEpSet, const char* fqdn, uint16_t port) {
|
||||
if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t index = pEpSet->numOfEps;
|
||||
tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn));
|
||||
pEpSet->eps[index].port = port;
|
||||
pEpSet->numOfEps += 1;
|
||||
}
|
||||
|
||||
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) {
|
||||
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < s1->numOfEps; i++) {
|
||||
if (s1->port[i] != s2->port[i]
|
||||
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
|
||||
if (s1->eps[i].port != s2->eps[i].port
|
||||
|| strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -1080,9 +1080,7 @@ static void doInitGlobalConfig(void) {
|
|||
void taosInitGlobalCfg() { pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig); }
|
||||
|
||||
int32_t taosCheckAndPrintCfg() {
|
||||
char fqdn[TSDB_FQDN_LEN];
|
||||
uint16_t port;
|
||||
|
||||
SEp ep = {0};
|
||||
if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) {
|
||||
taosSetAllDebugFlag();
|
||||
}
|
||||
|
@ -1097,15 +1095,15 @@ int32_t taosCheckAndPrintCfg() {
|
|||
if (tsFirst[0] == 0) {
|
||||
strcpy(tsFirst, tsLocalEp);
|
||||
} else {
|
||||
taosGetFqdnPortFromEp(tsFirst, fqdn, &port);
|
||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", fqdn, port);
|
||||
taosGetFqdnPortFromEp(tsFirst, &ep);
|
||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", ep.fqdn, ep.port);
|
||||
}
|
||||
|
||||
if (tsSecond[0] == 0) {
|
||||
strcpy(tsSecond, tsLocalEp);
|
||||
} else {
|
||||
taosGetFqdnPortFromEp(tsSecond, fqdn, &port);
|
||||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", fqdn, port);
|
||||
taosGetFqdnPortFromEp(tsSecond, &ep);
|
||||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", ep.fqdn, ep.port);
|
||||
}
|
||||
|
||||
taosCheckDataDirCfg();
|
||||
|
|
|
@ -57,13 +57,13 @@ void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint
|
|||
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
|
||||
if (pDnodeEp != NULL) {
|
||||
if (pPort != NULL) {
|
||||
*pPort = pDnodeEp->port;
|
||||
*pPort = pDnodeEp->ep.port;
|
||||
}
|
||||
if (pFqdn != NULL) {
|
||||
tstrncpy(pFqdn, pDnodeEp->fqdn, TSDB_FQDN_LEN);
|
||||
tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
if (pEp != NULL) {
|
||||
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
|
||||
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,12 +85,12 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
|
||||
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
|
||||
if (strcmp(epSet.fqdn[i], pDnode->cfg.localFqdn) == 0 && epSet.port[i] == pDnode->cfg.serverPort) {
|
||||
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||
if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) {
|
||||
epSet.inUse = (i + 1) % epSet.numOfEps;
|
||||
}
|
||||
|
||||
epSet.port[i] = htons(epSet.port[i]);
|
||||
epSet.eps[i].port = htons(epSet.eps[i].port);
|
||||
}
|
||||
|
||||
rpcSendRedirectRsp(pReq->handle, &epSet);
|
||||
|
@ -104,7 +104,7 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
|||
|
||||
pMgmt->mnodeEpSet = *pEpSet;
|
||||
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pMgmt->latch);
|
||||
|
@ -116,7 +116,7 @@ static void dndPrintDnodes(SDnode *pDnode) {
|
|||
dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num);
|
||||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
|
||||
SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i];
|
||||
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode);
|
||||
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,8 +145,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
|||
if (!pDnodeEp->isMnode) continue;
|
||||
if (mIndex >= TSDB_MAX_REPLICA) continue;
|
||||
pMgmt->mnodeEpSet.numOfEps++;
|
||||
strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn);
|
||||
pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port;
|
||||
|
||||
pMgmt->mnodeEpSet.eps[mIndex] = pDnodeEp->ep;
|
||||
mIndex++;
|
||||
}
|
||||
|
||||
|
@ -167,7 +167,7 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
|
|||
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
|
||||
if (pDnodeEp != NULL) {
|
||||
char epstr[TSDB_EP_LEN + 1];
|
||||
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
|
||||
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
|
||||
changed = strcmp(pEp, epstr) != 0;
|
||||
}
|
||||
|
||||
|
@ -251,11 +251,12 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
|||
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
|
||||
cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
|
||||
if (!dnodeId || dnodeId->type != cJSON_Number) {
|
||||
cJSON *did = cJSON_GetObjectItem(node, "id");
|
||||
if (!did || did->type != cJSON_Number) {
|
||||
dError("failed to read %s since dnodeId not found", pMgmt->file);
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
pDnodeEp->id = dnodeId->valueint;
|
||||
|
||||
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||
|
@ -263,14 +264,15 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
|
|||
dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
||||
tstrncpy(pDnodeEp->ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
|
||||
|
||||
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
|
||||
if (!dnodePort || dnodePort->type != cJSON_Number) {
|
||||
dError("failed to read %s since dnodePort not found", pMgmt->file);
|
||||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
pDnodeEp->port = dnodePort->valueint;
|
||||
|
||||
pDnodeEp->ep.port = dnodePort->valueint;
|
||||
|
||||
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
|
||||
if (!isMnode || isMnode->type != cJSON_Number) {
|
||||
|
@ -298,7 +300,8 @@ PRASE_DNODE_OVER:
|
|||
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
||||
pMgmt->dnodeEps->num = 1;
|
||||
pMgmt->dnodeEps->eps[0].isMnode = 1;
|
||||
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
|
||||
|
||||
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &(pMgmt->dnodeEps->eps[0].ep));
|
||||
}
|
||||
|
||||
dndResetDnodes(pDnode, pMgmt->dnodeEps);
|
||||
|
@ -329,8 +332,8 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
|
|||
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
|
||||
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
|
||||
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
|
||||
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->fqdn);
|
||||
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->port);
|
||||
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
|
||||
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
|
||||
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
|
||||
if (i < pMgmt->dnodeEps->num - 1) {
|
||||
len += snprintf(content + len, maxLen - len, " },{\n");
|
||||
|
@ -450,7 +453,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
|
|||
pDnodeEps->num = htonl(pDnodeEps->num);
|
||||
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
|
||||
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
|
||||
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
|
||||
pDnodeEps->eps[i].ep.port = htons(pDnodeEps->eps[i].ep.port);
|
||||
}
|
||||
|
||||
dndUpdateDnodeEps(pDnode, pDnodeEps);
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tep.h"
|
||||
#include "sut.h"
|
||||
|
||||
static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
|
||||
|
@ -61,11 +62,7 @@ void TestClient::Cleanup() {
|
|||
|
||||
SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
epSet.numOfEps = 1;
|
||||
epSet.port[0] = port;
|
||||
memcpy(epSet.fqdn[0], fqdn, TSDB_FQDN_LEN);
|
||||
|
||||
addEpIntoEpSet(&epSet, fqdn, port);
|
||||
rpcSendRequest(clientRpc, &epSet, pReq, NULL);
|
||||
tsem_wait(&sem);
|
||||
|
||||
|
|
|
@ -840,18 +840,18 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
|||
pInfo->vgId = htonl(pVgroup->vgId);
|
||||
pInfo->hashBegin = htonl(pVgroup->hashBegin);
|
||||
pInfo->hashEnd = htonl(pVgroup->hashEnd);
|
||||
pInfo->numOfEps = pVgroup->replica;
|
||||
pInfo->epset.numOfEps = pVgroup->replica;
|
||||
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
|
||||
SEpAddr *pEpArrr = &pInfo->epAddr[gid];
|
||||
SEp * pEp = &pInfo->epset.eps[gid];
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
if (pDnode != NULL) {
|
||||
memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEpArrr->port = htons(pDnode->port);
|
||||
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEp->port = htons(pDnode->port);
|
||||
}
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
|
||||
pInfo->inUse = gid;
|
||||
pInfo->epset.inUse = gid;
|
||||
}
|
||||
}
|
||||
vindex++;
|
||||
|
|
|
@ -203,8 +203,8 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
|
|||
}
|
||||
|
||||
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
|
||||
SEpSet epSet = {.inUse = 0, .numOfEps = 1, .port[0] = pDnode->port};
|
||||
memcpy(epSet.fqdn[0], pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
SEpSet epSet = {0};
|
||||
addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
|
||||
return epSet;
|
||||
}
|
||||
|
||||
|
@ -261,8 +261,8 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
|
|||
|
||||
SDnodeEp *pEp = &pEps->eps[numOfEps];
|
||||
pEp->id = htonl(pDnode->id);
|
||||
pEp->port = htons(pDnode->port);
|
||||
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEp->ep.port = htons(pDnode->port);
|
||||
memcpy(pEp->ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEp->isMnode = 0;
|
||||
if (mndIsMnode(pMnode, pDnode->id)) {
|
||||
pEp->isMnode = 1;
|
||||
|
|
|
@ -237,8 +237,8 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
|||
if (pIter == NULL) break;
|
||||
if (pObj->pDnode == NULL) break;
|
||||
|
||||
pEpSet->port[pEpSet->numOfEps] = htons(pObj->pDnode->port);
|
||||
memcpy(pEpSet->fqdn[pEpSet->numOfEps], pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
pEpSet->eps[pEpSet->numOfEps].port = htons(pObj->pDnode->port);
|
||||
memcpy(pEpSet->eps[pEpSet->numOfEps].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
if (pObj->role == TAOS_SYNC_STATE_LEADER) {
|
||||
pEpSet->inUse = pEpSet->numOfEps;
|
||||
}
|
||||
|
|
|
@ -208,12 +208,12 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
SArray *pArray;
|
||||
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
SSubplan *plan = taosArrayGetP(inner, 0);
|
||||
plan->execNode.inUse = 0;
|
||||
strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
|
||||
plan->execNode.epAddr[0].port = 6030;
|
||||
plan->execNode.nodeId = 2;
|
||||
plan->execNode.numOfEps = 1;
|
||||
|
||||
plan->execNode.nodeId = 2;
|
||||
SEpSet* pEpSet = &plan->execNode.epset;
|
||||
|
||||
pEpSet->inUse = 0;
|
||||
addEpIntoEpSet(pEpSet, "localhost", 6030);
|
||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -225,7 +225,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
CEp.consumerId = -1;
|
||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||
STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
|
||||
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||
CEp.epSet = pTaskInfo->addr.epset;
|
||||
|
||||
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
|
||||
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||
CEp.vgId = pTaskInfo->addr.nodeId;
|
||||
|
|
|
@ -424,9 +424,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
|
|||
epset.inUse = epset.numOfEps;
|
||||
}
|
||||
|
||||
epset.port[epset.numOfEps] = pDnode->port;
|
||||
memcpy(&epset.fqdn[epset.numOfEps], pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
epset.numOfEps++;
|
||||
addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
}
|
||||
|
||||
|
|
|
@ -277,9 +277,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
EXPECT_GT(pInfo->vgId, 0);
|
||||
EXPECT_EQ(pInfo->hashBegin, 0);
|
||||
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
|
||||
EXPECT_EQ(pInfo->inUse, 0);
|
||||
EXPECT_EQ(pInfo->numOfEps, 1);
|
||||
SEpAddr* pAddr = &pInfo->epAddr[0];
|
||||
EXPECT_EQ(pInfo->epset.inUse, 0);
|
||||
EXPECT_EQ(pInfo->epset.numOfEps, 1);
|
||||
SEp* pAddr = &pInfo->epset.eps[0];
|
||||
pAddr->port = htons(pAddr->port);
|
||||
EXPECT_EQ(pAddr->port, 9030);
|
||||
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
||||
|
@ -293,9 +293,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
EXPECT_GT(pInfo->vgId, 0);
|
||||
EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2);
|
||||
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
|
||||
EXPECT_EQ(pInfo->inUse, 0);
|
||||
EXPECT_EQ(pInfo->numOfEps, 1);
|
||||
SEpAddr* pAddr = &pInfo->epAddr[0];
|
||||
EXPECT_EQ(pInfo->epset.inUse, 0);
|
||||
EXPECT_EQ(pInfo->epset.numOfEps, 1);
|
||||
SEp* pAddr = &pInfo->epset.eps[0];
|
||||
pAddr->port = htons(pAddr->port);
|
||||
EXPECT_EQ(pAddr->port, 9030);
|
||||
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
||||
|
|
|
@ -44,7 +44,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
|
|||
pRsp->acctId = htonl(pRsp->acctId);
|
||||
pRsp->clusterId = htobe64(pRsp->clusterId);
|
||||
pRsp->connId = htonl(pRsp->connId);
|
||||
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
|
||||
pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port);
|
||||
|
||||
EXPECT_EQ(pRsp->acctId, 1);
|
||||
EXPECT_GT(pRsp->clusterId, 0);
|
||||
|
@ -53,8 +53,8 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
|
|||
|
||||
EXPECT_EQ(pRsp->epSet.inUse, 0);
|
||||
EXPECT_EQ(pRsp->epSet.numOfEps, 1);
|
||||
EXPECT_EQ(pRsp->epSet.port[0], 9031);
|
||||
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
|
||||
EXPECT_EQ(pRsp->epSet.eps[0].port, 9031);
|
||||
EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost");
|
||||
|
||||
connId = pRsp->connId;
|
||||
}
|
||||
|
|
|
@ -65,7 +65,6 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
|
|||
|
||||
int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
ctgDebug("try to get db vgroup from mnode, db:%s", input->db);
|
||||
|
@ -216,17 +215,6 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
|
||||
epSet->inUse = 0;
|
||||
epSet->numOfEps = vgroupInfo->numOfEps;
|
||||
|
||||
for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
|
||||
memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
|
||||
memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
|
||||
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
|
||||
char *msg = NULL;
|
||||
|
@ -292,7 +280,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
|
|||
|
||||
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)};
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
|
||||
|
@ -308,10 +295,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
|
|||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
SEpSet epSet;
|
||||
|
||||
ctgGenEpSet(&epSet, vgroupInfo);
|
||||
rpcSendRecv(pTransporter, &epSet, &rpcMsg, &rpcRsp);
|
||||
rpcSendRecv(pTransporter, &vgroupInfo->epset, &rpcMsg, &rpcRsp);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
|
||||
|
|
|
@ -195,10 +195,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
|||
vgInfo.vgId = i + 1;
|
||||
vgInfo.hashBegin = i * hashUnit;
|
||||
vgInfo.hashEnd = hashUnit * (i + 1) - 1;
|
||||
vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1;
|
||||
vgInfo.inUse = i % vgInfo.numOfEps;
|
||||
for (int32_t n = 0; n < vgInfo.numOfEps; ++n) {
|
||||
SEpAddr *addr = &vgInfo.epAddr[n];
|
||||
vgInfo.epset.numOfEps = i % TSDB_MAX_REPLICA + 1;
|
||||
vgInfo.epset.inUse = i % vgInfo.epset.numOfEps;
|
||||
for (int32_t n = 0; n < vgInfo.epset.numOfEps; ++n) {
|
||||
SEp *addr = &vgInfo.epset.eps[n];
|
||||
strcpy(addr->fqdn, "a0");
|
||||
addr->port = htons(n + 22);
|
||||
}
|
||||
|
@ -229,10 +229,10 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
|||
vg->vgId = htonl(i + 1);
|
||||
vg->hashBegin = htonl(i * hashUnit);
|
||||
vg->hashEnd = htonl(hashUnit * (i + 1) - 1);
|
||||
vg->numOfEps = i % TSDB_MAX_REPLICA + 1;
|
||||
vg->inUse = i % vg->numOfEps;
|
||||
for (int32_t n = 0; n < vg->numOfEps; ++n) {
|
||||
SEpAddr *addr = &vg->epAddr[n];
|
||||
vg->epset.numOfEps = i % TSDB_MAX_REPLICA + 1;
|
||||
vg->epset.inUse = i % vg->epset.numOfEps;
|
||||
for (int32_t n = 0; n < vg->epset.numOfEps; ++n) {
|
||||
SEp *addr = &vg->epset.eps[n];
|
||||
strcpy(addr->fqdn, "a0");
|
||||
addr->port = htons(n + 22);
|
||||
}
|
||||
|
@ -693,7 +693,7 @@ TEST(tableMeta, normalTable) {
|
|||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(vgInfo.vgId, 8);
|
||||
ASSERT_EQ(vgInfo.numOfEps, 3);
|
||||
ASSERT_EQ(vgInfo.epset.numOfEps, 3);
|
||||
|
||||
ctgTestSetPrepareTableMeta();
|
||||
|
||||
|
@ -983,7 +983,7 @@ TEST(tableDistVgroup, normalTable) {
|
|||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||
ASSERT_EQ(vgInfo->vgId, 8);
|
||||
ASSERT_EQ(vgInfo->numOfEps, 3);
|
||||
ASSERT_EQ(vgInfo->epset.numOfEps, 3);
|
||||
|
||||
catalogDestroy();
|
||||
}
|
||||
|
@ -1015,7 +1015,7 @@ TEST(tableDistVgroup, childTableCase) {
|
|||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||
ASSERT_EQ(vgInfo->vgId, 9);
|
||||
ASSERT_EQ(vgInfo->numOfEps, 4);
|
||||
ASSERT_EQ(vgInfo->epset.numOfEps, 4);
|
||||
|
||||
catalogDestroy();
|
||||
}
|
||||
|
@ -1046,13 +1046,13 @@ TEST(tableDistVgroup, superTableCase) {
|
|||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10);
|
||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||
ASSERT_EQ(vgInfo->vgId, 1);
|
||||
ASSERT_EQ(vgInfo->numOfEps, 1);
|
||||
ASSERT_EQ(vgInfo->epset.numOfEps, 1);
|
||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1);
|
||||
ASSERT_EQ(vgInfo->vgId, 2);
|
||||
ASSERT_EQ(vgInfo->numOfEps, 2);
|
||||
ASSERT_EQ(vgInfo->epset.numOfEps, 2);
|
||||
vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2);
|
||||
ASSERT_EQ(vgInfo->vgId, 3);
|
||||
ASSERT_EQ(vgInfo->numOfEps, 3);
|
||||
ASSERT_EQ(vgInfo->epset.numOfEps, 3);
|
||||
|
||||
catalogDestroy();
|
||||
}
|
||||
|
@ -1088,14 +1088,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(vgInfo.vgId, 8);
|
||||
ASSERT_EQ(vgInfo.numOfEps, 3);
|
||||
ASSERT_EQ(vgInfo.epset.numOfEps, 3);
|
||||
|
||||
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||
ASSERT_EQ(pvgInfo->vgId, 8);
|
||||
ASSERT_EQ(pvgInfo->numOfEps, 3);
|
||||
ASSERT_EQ(pvgInfo->epset.numOfEps, 3);
|
||||
taosArrayDestroy(vgList);
|
||||
|
||||
ctgTestBuildDBVgroup(&dbVgroup);
|
||||
|
@ -1105,14 +1105,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(vgInfo.vgId, 7);
|
||||
ASSERT_EQ(vgInfo.numOfEps, 2);
|
||||
ASSERT_EQ(vgInfo.epset.numOfEps, 2);
|
||||
|
||||
code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1);
|
||||
pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0);
|
||||
ASSERT_EQ(pvgInfo->vgId, 8);
|
||||
ASSERT_EQ(pvgInfo->numOfEps, 3);
|
||||
ASSERT_EQ(pvgInfo->epset.numOfEps, 3);
|
||||
taosArrayDestroy(vgList);
|
||||
|
||||
catalogDestroy();
|
||||
|
|
|
@ -5163,14 +5163,9 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
||||
SEpSet epSet = {0};
|
||||
epSet.numOfEps = pSource->addr.numOfEps;
|
||||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, pExchangeInfo->current, totalSources);
|
||||
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
pMsg->sId = htobe64(pSource->schedId);
|
||||
|
@ -5192,7 +5187,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp;
|
||||
|
|
|
@ -3644,6 +3644,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//TODO remove it
|
||||
int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgList) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
|
||||
|
@ -3651,21 +3652,17 @@ int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgL
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vgroupNum = taosArrayGetSize(vgroupList);
|
||||
size_t vgroupNum = taosArrayGetSize(vgroupList);
|
||||
|
||||
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum);
|
||||
|
||||
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
|
||||
vgList->numOfVgroups = vgroupNum;
|
||||
|
||||
for (int32_t i = 0; i < vgroupNum; ++i) {
|
||||
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
|
||||
vgList->vgroups[i].vgId = vg->vgId;
|
||||
vgList->vgroups[i].numOfEps = vg->numOfEps;
|
||||
memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr));
|
||||
vgList->vgroups[i] = *vg;
|
||||
}
|
||||
|
||||
*pVgList = vgList;
|
||||
|
||||
taosArrayDestroy(vgroupList);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -58,13 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
|
|||
|
||||
SVgroupInfo* info = taosArrayGet(array, 0);
|
||||
pShowReq->head.vgId = htonl(info->vgId);
|
||||
pEpSet->numOfEps = info->numOfEps;
|
||||
pEpSet->inUse = info->inUse;
|
||||
|
||||
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
|
||||
pEpSet->port[i] = info->epAddr[i].port;
|
||||
}
|
||||
*pEpSet = info->epset;
|
||||
|
||||
*outputLen = sizeof(SVShowTablesReq);
|
||||
*output = pShowReq;
|
||||
|
|
|
@ -1426,35 +1426,6 @@ bool isQueryWithLimit(SQueryStmtInfo* pQueryInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
SVgroupsInfo* vgroupInfoClone(SVgroupsInfo *vgroupList) {
|
||||
if (vgroupList == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupList->numOfVgroups;
|
||||
SVgroupsInfo* pNew = malloc(size);
|
||||
if (pNew == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNew->numOfVgroups = vgroupList->numOfVgroups;
|
||||
|
||||
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
|
||||
SVgroupMsg* pNewVInfo = &pNew->vgroups[i];
|
||||
|
||||
SVgroupMsg* pvInfo = &vgroupList->vgroups[i];
|
||||
pNewVInfo->vgId = pvInfo->vgId;
|
||||
pNewVInfo->numOfEps = pvInfo->numOfEps;
|
||||
|
||||
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
|
||||
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
|
||||
tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
return pNew;
|
||||
}
|
||||
|
||||
void* vgroupInfoClear(SVgroupsInfo *vgroupList) {
|
||||
if (vgroupList == NULL) {
|
||||
return NULL;
|
||||
|
@ -1505,19 +1476,6 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
|
|||
return p;
|
||||
}
|
||||
|
||||
SVgroupsInfo* vgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) {
|
||||
assert(pVgroupsInfo != NULL);
|
||||
|
||||
size_t size = sizeof(SVgroupMsg) * pVgroupsInfo->numOfVgroups + sizeof(SVgroupsInfo);
|
||||
SVgroupsInfo* pInfo = calloc(1, size);
|
||||
pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups;
|
||||
for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) {
|
||||
memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg));
|
||||
}
|
||||
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
int32_t getNumOfOutput(SFieldInfo* pFieldInfo) {
|
||||
return pFieldInfo->numOfOutput;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "mockCatalogService.h"
|
||||
|
||||
#include "tep.h"
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
|
@ -39,7 +40,16 @@ public:
|
|||
|
||||
virtual TableBuilder& setVgid(int16_t vgid) {
|
||||
schema()->vgId = vgid;
|
||||
meta_->vgs.emplace_back(SVgroupInfo{.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}});
|
||||
|
||||
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0, };
|
||||
|
||||
vgroup.epset.eps[0] = (SEp){"dnode_1", 6030};
|
||||
vgroup.epset.eps[1] = (SEp){"dnode_2", 6030};
|
||||
vgroup.epset.eps[2] = (SEp){"dnode_3", 6030};
|
||||
vgroup.epset.inUse = 0;
|
||||
vgroup.epset.numOfEps = 3;
|
||||
|
||||
meta_->vgs.emplace_back(vgroup);
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -112,9 +122,7 @@ public:
|
|||
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||
// todo
|
||||
vgInfo->vgId = 1;
|
||||
vgInfo->numOfEps = 1;
|
||||
vgInfo->epAddr[0].port = 6030;
|
||||
strcpy(vgInfo->epAddr[0].fqdn, "node1");
|
||||
addEpIntoEpSet(&vgInfo->epset, "node1", 6030);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -133,9 +141,16 @@ public:
|
|||
meta_[db][tbname].reset(new MockTableMeta());
|
||||
meta_[db][tbname]->schema.reset(table.release());
|
||||
meta_[db][tbname]->schema->uid = id_++;
|
||||
meta_[db][tbname]->vgs.emplace_back((SVgroupInfo){.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}});
|
||||
|
||||
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,};
|
||||
addEpIntoEpSet(&vgroup.epset, "dnode_1", 6030);
|
||||
addEpIntoEpSet(&vgroup.epset, "dnode_2", 6030);
|
||||
addEpIntoEpSet(&vgroup.epset, "dnode_3", 6030);
|
||||
vgroup.epset.inUse = 0;
|
||||
|
||||
meta_[db][tbname]->vgs.emplace_back(vgroup);
|
||||
// super table
|
||||
meta_[db][stbname]->vgs.emplace_back((SVgroupInfo){.vgId = vgid, .hashBegin = 0, .hashEnd = 0, .inUse = 0, .numOfEps = 3, .epAddr = {{"dnode_1", 6030}, {"dnode_2", 6030}, {"dnode_3", 6030}}});
|
||||
meta_[db][stbname]->vgs.emplace_back(vgroup);
|
||||
}
|
||||
|
||||
void showTables() const {
|
||||
|
|
|
@ -251,24 +251,9 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
|||
return subplan;
|
||||
}
|
||||
|
||||
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
|
||||
execNode->nodeId = vg->vgId;
|
||||
execNode->inUse = vg->inUse;
|
||||
execNode->numOfEps = vg->numOfEps;
|
||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||
execNode->epAddr[i] = vg->epAddr[i];
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
|
||||
execNode->nodeId = vg->vgId;
|
||||
execNode->inUse = 0; // todo
|
||||
execNode->numOfEps = vg->numOfEps;
|
||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||
execNode->epAddr[i] = vg->epAddr[i];
|
||||
}
|
||||
return;
|
||||
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
|
||||
pNodeAddr->nodeId = vg->vgId;
|
||||
pNodeAddr->epset = vg->epset;
|
||||
}
|
||||
|
||||
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
|
||||
|
@ -277,7 +262,8 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
|||
STORE_CURRENT_SUBPLAN(pCxt);
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||
subplan->msgType = TDMT_VND_QUERY;
|
||||
vgroupMsgToEpSet(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
||||
|
||||
vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
||||
subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
|
||||
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
|
||||
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||
|
@ -297,11 +283,12 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
|
|||
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
|
||||
}
|
||||
|
||||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
|
||||
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
|
||||
|
||||
// TODO: the SVgroupInfo index
|
||||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
|
||||
SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
|
||||
vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
|
||||
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
|
||||
return createUserTableScanNode(pPlanNode, pTable, type);
|
||||
return createUserTableScanNode(pPlanNode, pTableInfo, type);
|
||||
}
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
|
@ -374,7 +361,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
|
|||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
||||
|
||||
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
|
||||
subplan->execNode.epset = blocks->vg.epset;
|
||||
subplan->pDataSink = createDataInserter(pCxt, blocks, NULL);
|
||||
subplan->pNode = NULL;
|
||||
subplan->type = QUERY_TYPE_MODIFY;
|
||||
|
|
|
@ -736,7 +736,7 @@ static const char* jkEpAddrFqdn = "Fqdn";
|
|||
static const char* jkEpAddrPort = "Port";
|
||||
|
||||
static bool epAddrToJson(const void* obj, cJSON* json) {
|
||||
const SEpAddr* ep = (const SEpAddr*)obj;
|
||||
const SEp* ep = (const SEp*)obj;
|
||||
bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn);
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port);
|
||||
|
@ -745,7 +745,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) {
|
|||
}
|
||||
|
||||
static bool epAddrFromJson(const cJSON* json, void* obj) {
|
||||
SEpAddr* ep = (SEpAddr*)obj;
|
||||
SEp* ep = (SEp*)obj;
|
||||
copyString(json, jkEpAddrFqdn, ep->fqdn);
|
||||
ep->port = getNumber(json, jkEpAddrPort);
|
||||
return true;
|
||||
|
@ -763,11 +763,11 @@ static bool queryNodeAddrToJson(const void* obj, cJSON* json) {
|
|||
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId);
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse);
|
||||
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epset.inUse);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps);
|
||||
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epset.eps, sizeof(SEp), pAddr->epset.numOfEps);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -776,11 +776,11 @@ static bool queryNodeAddrFromJson(const cJSON* json, void* obj) {
|
|||
SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj;
|
||||
|
||||
pAddr->nodeId = getNumber(json, jkNodeAddrId);
|
||||
pAddr->inUse = getNumber(json, jkNodeAddrInUse);
|
||||
pAddr->epset.inUse = getNumber(json, jkNodeAddrInUse);
|
||||
|
||||
int32_t numOfEps = 0;
|
||||
bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps);
|
||||
pAddr->numOfEps = numOfEps;
|
||||
bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epset.eps, sizeof(SEp), &numOfEps);
|
||||
pAddr->epset.numOfEps = numOfEps;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -124,12 +124,10 @@ private:
|
|||
}
|
||||
|
||||
void copyStorageMeta(SVgroupsInfo** dst, const std::vector<SVgroupInfo>& src) {
|
||||
*dst = (SVgroupsInfo*)myCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * src.size());
|
||||
*dst = (SVgroupsInfo*)myCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * src.size());
|
||||
(*dst)->numOfVgroups = src.size();
|
||||
for (int32_t i = 0; i < src.size(); ++i) {
|
||||
(*dst)->vgroups[i].vgId = src[i].vgId;
|
||||
(*dst)->vgroups[i].numOfEps = src[i].numOfEps;
|
||||
memcpy((*dst)->vgroups[i].epAddr, src[i].epAddr, src[i].numOfEps);
|
||||
(*dst)->vgroups[i] = src[i];
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -127,8 +127,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin);
|
||||
pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd);
|
||||
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = ntohs(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].epset.numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
|
|
|
@ -417,13 +417,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (pTask->plan->execNode.numOfEps > 0) {
|
||||
if (pTask->plan->execNode.epset.numOfEps > 0) {
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps);
|
||||
SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epset.numOfEps);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -446,7 +446,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum);
|
||||
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
|
@ -1050,31 +1050,19 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
|
||||
epSet->inUse = addr->inUse;
|
||||
epSet->numOfEps = addr->numOfEps;
|
||||
|
||||
for (int8_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn));
|
||||
epSet->port[i] = addr->epAddr[i].port;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
|
||||
uint32_t msgSize = 0;
|
||||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
bool isCandidateAddr = false;
|
||||
SEpSet epSet;
|
||||
|
||||
if (NULL == addr) {
|
||||
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
|
||||
|
||||
isCandidateAddr = true;
|
||||
}
|
||||
|
||||
schConvertAddrToEpSet(addr, &epSet);
|
||||
|
||||
SEpSet epSet = addr->epset;
|
||||
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
case TDMT_VND_SUBMIT: {
|
||||
|
@ -1218,8 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
// printf("physical plan:%s\n", pTask->msg);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
@ -1300,7 +1286,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
|
|||
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
|
||||
qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
|
||||
|
||||
if (pNodeList && taosArrayGetSize(pNodeList) <= 0) {
|
||||
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
|
||||
qDebug("QID:0x%"PRIx64" input exec nodeList is empty", pDag->queryId);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@
|
|||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
#pragma GCC diagnostic ignored "-Wliteral-suffix"
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
|
@ -92,11 +91,11 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
|||
scanPlan->id.templateId = 0x0000000000000002;
|
||||
scanPlan->id.subplanId = 0x0000000000000003;
|
||||
scanPlan->type = QUERY_TYPE_SCAN;
|
||||
scanPlan->execNode.numOfEps = 1;
|
||||
|
||||
scanPlan->execNode.nodeId = 1;
|
||||
scanPlan->execNode.inUse = 0;
|
||||
scanPlan->execNode.epAddr[0].port = 6030;
|
||||
strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0");
|
||||
scanPlan->execNode.epset.inUse = 0;
|
||||
addEpIntoEpSet(&scanPlan->execNode.epset, "ep0", 6030);
|
||||
|
||||
scanPlan->pChildren = NULL;
|
||||
scanPlan->level = 1;
|
||||
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
|
||||
|
@ -108,7 +107,8 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
|||
mergePlan->id.subplanId = 0x5555555555;
|
||||
mergePlan->type = QUERY_TYPE_MERGE;
|
||||
mergePlan->level = 0;
|
||||
mergePlan->execNode.numOfEps = 0;
|
||||
mergePlan->execNode.epset.numOfEps = 0;
|
||||
|
||||
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||
mergePlan->pParents = NULL;
|
||||
mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||
|
@ -144,11 +144,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
|||
insertPlan[0].id.subplanId = 0x0000000000000004;
|
||||
insertPlan[0].type = QUERY_TYPE_MODIFY;
|
||||
insertPlan[0].level = 0;
|
||||
insertPlan[0].execNode.numOfEps = 1;
|
||||
|
||||
insertPlan[0].execNode.nodeId = 1;
|
||||
insertPlan[0].execNode.inUse = 0;
|
||||
insertPlan[0].execNode.epAddr[0].port = 6030;
|
||||
strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
|
||||
insertPlan[0].execNode.epset.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan[0].execNode.epset, "ep0", 6030);
|
||||
|
||||
insertPlan[0].pChildren = NULL;
|
||||
insertPlan[0].pParents = NULL;
|
||||
insertPlan[0].pNode = NULL;
|
||||
|
@ -160,11 +160,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
|||
insertPlan[1].id.subplanId = 0x0000000000000005;
|
||||
insertPlan[1].type = QUERY_TYPE_MODIFY;
|
||||
insertPlan[1].level = 0;
|
||||
insertPlan[1].execNode.numOfEps = 1;
|
||||
|
||||
insertPlan[1].execNode.nodeId = 1;
|
||||
insertPlan[1].execNode.inUse = 1;
|
||||
insertPlan[1].execNode.epAddr[0].port = 6030;
|
||||
strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
|
||||
insertPlan[1].execNode.epset.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan[1].execNode.epset, "ep0", 6030);
|
||||
|
||||
insertPlan[1].pChildren = NULL;
|
||||
insertPlan[1].pParents = NULL;
|
||||
insertPlan[1].pNode = NULL;
|
||||
|
@ -371,9 +371,9 @@ void* schtRunJobThread(void *aa) {
|
|||
while (!schtTestStop) {
|
||||
schtBuildQueryDag(&dag);
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
|
||||
SEpAddr qnodeAddr = {0};
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
|
@ -523,9 +523,9 @@ TEST(queryTest, normalCase) {
|
|||
|
||||
schtInitLogFile();
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
|
||||
SEpAddr qnodeAddr = {0};
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
|
@ -627,9 +627,9 @@ TEST(insertTest, normalCase) {
|
|||
|
||||
schtInitLogFile();
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
|
||||
SEpAddr qnodeAddr = {0};
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
|
|
|
@ -814,9 +814,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
|
|||
SEpSet * pEpSet = &pContext->epSet;
|
||||
|
||||
pConn =
|
||||
rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
|
||||
rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
||||
if (pConn == NULL || pConn->user[0] == 0) {
|
||||
pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
|
||||
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
||||
}
|
||||
|
||||
if (pConn) {
|
||||
|
@ -1188,7 +1188,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
|
|||
|
||||
// for UDP, port may be changed by server, the port in epSet shall be used for cache
|
||||
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
|
||||
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse],
|
||||
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.eps[pContext->epSet.inUse].port,
|
||||
pConn->connType);
|
||||
} else {
|
||||
rpcCloseConn(pConn);
|
||||
|
@ -1202,9 +1202,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
|
|||
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
|
||||
pContext->epSet.inUse);
|
||||
for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
|
||||
pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
|
||||
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
|
||||
pContext->epSet.port[i]);
|
||||
pContext->epSet.eps[i].port = htons(pContext->epSet.eps[i].port);
|
||||
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.eps[i].fqdn,
|
||||
pContext->epSet.eps[i].port);
|
||||
}
|
||||
}
|
||||
rpcSendReqToServer(pRpc, pContext);
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <tep.h>
|
||||
#include "os.h"
|
||||
#include "rpcLog.h"
|
||||
#include "taoserror.h"
|
||||
|
@ -87,12 +88,9 @@ int main(int argc, char *argv[]) {
|
|||
pthread_attr_t thattr;
|
||||
|
||||
// server info
|
||||
epSet.numOfEps = 1;
|
||||
epSet.inUse = 0;
|
||||
epSet.port[0] = 7000;
|
||||
epSet.port[1] = 7000;
|
||||
strcpy(epSet.fqdn[0], serverIp);
|
||||
strcpy(epSet.fqdn[1], "192.168.0.1");
|
||||
addEpIntoEpSet(&epSet, serverIp, 7000);
|
||||
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||
|
||||
// client info
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
|
@ -110,9 +108,9 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
epSet.port[0] = atoi(argv[++i]);
|
||||
epSet.eps[0].port = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
|
||||
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
|
@ -136,7 +134,7 @@ int main(int argc, char *argv[]) {
|
|||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
|
||||
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
|
||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||
|
|
Loading…
Reference in New Issue