Merge branch 'develop' into feature/add-tag-and-alter-table-cases

This commit is contained in:
Shuduo Sang 2020-05-12 10:47:43 +08:00
commit 9444ded1dc
17 changed files with 140 additions and 156 deletions

View File

@ -365,7 +365,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);

View File

@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL) { if (pSql == NULL) {
tscError("%p sql is already released", pSql->signature); tscError("%p sql is already released", pSql->signature);
@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
return; return;
} }
if (pCmd->command < TSDB_SQL_MGMT) {
if (pIpSet) pSql->ipList = *pIpSet;
} else {
if (pIpSet) tscMgmtIpSet = *pIpSet;
}
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
} else { } else {

View File

@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return taosCfgDynamicOptions(pCfg->config); return taosCfgDynamicOptions(pCfg->config);
} }
void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { void dnodeUpdateIpSet(SRpcIpSet *pIpSet) {
dPrint("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); dPrint("mnode IP list is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
for (int i = 0; i < pIpSet->numOfIps; ++i) { for (int i = 0; i < pIpSet->numOfIps; ++i) {
dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i])
} }

View File

@ -29,11 +29,11 @@
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "mnode.h" #include "mnode.h"
extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet);
static void *tsDnodeServerRpc = NULL; static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL; static void *tsDnodeClientRpc = NULL;
@ -81,7 +81,7 @@ void dnodeCleanupServer() {
} }
} }
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg rspMsg; SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle; rspMsg.handle = pMsg->handle;
rspMsg.pCont = NULL; rspMsg.pCont = NULL;
@ -119,7 +119,6 @@ int32_t dnodeInitClient() {
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromDnode; rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
@ -145,9 +144,10 @@ void dnodeCleanupClient() {
} }
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
if (dnodeProcessRspMsgFp[pMsg->msgType]) { if (dnodeProcessRspMsgFp[pMsg->msgType]) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) dnodeUpdateIpSet(pIpSet);
(*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);

View File

@ -28,7 +28,7 @@
#include "dnodeShell.h" #include "dnodeShell.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsDnodeShellRpc = NULL; static void * tsDnodeShellRpc = NULL;
static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeQueryReqNum = 0;
@ -106,7 +106,7 @@ void dnodeCleanupShell() {
} }
} }
void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.handle = pMsg->handle; rpcMsg.handle = pMsg->handle;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;

View File

@ -66,10 +66,7 @@ typedef struct {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *); void (*cfp)(SRpcMsg *, SRpcIpSet *);
// call back to process notify the ipSet changes, for client app only
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);

View File

@ -85,8 +85,7 @@ typedef struct SSuperTableObj {
int32_t numOfTables; int32_t numOfTables;
int16_t nextColId; int16_t nextColId;
SSchema * schema; SSchema * schema;
int32_t vgLen; void * vgHash;
int32_t * vgList;
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {

View File

@ -24,6 +24,7 @@
#include "tname.h" #include "tname.h"
#include "tidpool.h" #include "tidpool.h"
#include "tglobal.h" #include "tglobal.h"
#include "hash.h"
#include "dnode.h" #include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtInt.h" #include "mgmtInt.h"
@ -363,39 +364,35 @@ static void mgmtCleanUpChildTables() {
} }
static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
if (pStable->vgLen == 0) {
pStable->vgLen = 8;
pStable->vgList = calloc(pStable->vgLen, sizeof(int32_t));
}
bool find = false;
int32_t pos = 0;
for (pos = 0; pos < pStable->vgLen; ++pos) {
if (pStable->vgList[pos] == 0) break;
if (pStable->vgList[pos] == pCtable->vgId) {
find = true;
break;
}
}
if (!find) {
if (pos >= pStable->vgLen) {
pStable->vgLen *= 2;
pStable->vgList = realloc(pStable->vgList, pStable->vgLen * sizeof(int32_t));
}
pStable->vgList[pos] = pCtable->vgId;
}
pStable->numOfTables++; pStable->numOfTables++;
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
}
if (pStable->vgHash != NULL) {
taosHashPut(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
}
} }
static void mgmtRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { static void mgmtRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
pStable->numOfTables--; pStable->numOfTables--;
if (pStable->vgHash == NULL) return;
SVgObj *pVgroup = mgmtGetVgroup(pCtable->vgId);
if (pVgroup != NULL) {
taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId));
}
mgmtDecVgroupRef(pVgroup);
} }
static void mgmtDestroySuperTable(SSuperTableObj *pStable) { static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
if (pStable->vgHash != NULL) {
taosHashCleanup(pStable->vgHash);
pStable->vgHash = NULL;
}
tfree(pStable->schema); tfree(pStable->schema);
tfree(pStable->vgList)
tfree(pStable); tfree(pStable);
} }
@ -434,7 +431,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
void *oldSchema = pTable->schema; void *oldSchema = pTable->schema;
memcpy(pTable, pNew, pOper->rowSize); memcpy(pTable, pNew, pOper->rowSize);
pTable->schema = pNew->schema; pTable->schema = pNew->schema;
free(pNew->vgList); free(pNew->vgHash);
free(pNew); free(pNew);
free(oldSchema); free(oldSchema);
} }
@ -797,26 +794,26 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
if (pStable->numOfTables != 0) { if (pStable->numOfTables != 0) {
mgmtDropAllChildTablesInStable(pStable); SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash);
for (int32_t vg = 0; vg < pStable->vgLen; ++vg) { while (taosHashIterNext(pIter)) {
int32_t vgId = pStable->vgList[vg]; int32_t *pVgId = taosHashIterGet(pIter);
if (vgId == 0) break; SVgObj *pVgroup = mgmtGetVgroup(*pVgId);
SVgObj *pVgroup = mgmtGetVgroup(vgId);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg)); SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg));
pDrop->contLen = htonl(sizeof(SMDDropSTableMsg)); pDrop->contLen = htonl(sizeof(SMDDropSTableMsg));
pDrop->vgId = htonl(vgId); pDrop->vgId = htonl(pVgroup->vgId);
pDrop->uid = htobe64(pStable->uid); pDrop->uid = htobe64(pStable->uid);
mgmtExtractTableName(pStable->info.tableId, pDrop->tableId); mgmtExtractTableName(pStable->info.tableId, pDrop->tableId);
mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, vgId); mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, pVgroup->vgId);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&ipSet, &rpcMsg);
mgmtDecVgroupRef(pVgroup); mgmtDecVgroupRef(pVgroup);
} }
mgmtDropAllChildTablesInStable(pStable);
} }
SSdbOper oper = { SSdbOper oper = {
@ -1243,59 +1240,58 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
SCMSTableVgroupMsg *pInfo = pMsg->pCont; SCMSTableVgroupMsg *pInfo = pMsg->pCont;
int32_t numOfTable = htonl(pInfo->numOfTables); int32_t numOfTable = htonl(pInfo->numOfTables);
char* name = (char*) pInfo + sizeof(struct SCMSTableVgroupMsg); // reserve space
SCMSTableVgroupRspMsg *pRsp = NULL; int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo);
for (int32_t i = 0; i < numOfTable; ++i) {
// todo set the initial size to be 10, fix me char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + (sizeof(SCMVgroupInfo) * 10 + sizeof(SVgroupsInfo))*numOfTable; SSuperTableObj *pTable = mgmtGetSuperTable(stableName);
if (pTable->vgHash != NULL) {
pRsp = rpcMallocCont(contLen); contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo));
}
mgmtDecTableRef(pTable);
}
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
pRsp->numOfTables = htonl(numOfTable); pRsp->numOfTables = htonl(numOfTable);
char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg); char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg);
for(int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
SSuperTableObj *pTable = mgmtGetSuperTable(name); char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
SSuperTableObj *pTable = mgmtGetSuperTable(stableName);
pMsg->pTable = (STableObj *)pTable; SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg;
if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
return; int32_t vgSize = 0;
} while (taosHashIterNext(pIter)) {
int32_t *pVgId = taosHashIterGet(pIter);
SVgroupsInfo* pVgroup = (SVgroupsInfo*) msg; SVgObj * pVgroup = mgmtGetVgroup(*pVgId);
if (pVgroup == NULL) continue;
int32_t vg = 0;
for (; vg < pTable->vgLen; ++vg) { pVgroupInfo->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
int32_t vgId = pTable->vgList[vg]; for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
if (vgId == 0) break; SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
SVgObj *vgItem = mgmtGetVgroup(vgId);
if (vgItem == NULL) break;
pVgroup->vgroups[vg].vgId = htonl(vgId);
for (int32_t vn = 0; vn < vgItem->numOfVnodes; ++vn) {
SDnodeObj *pDnode = vgItem->vnodeGid[vn].pDnode;
if (pDnode == NULL) break; if (pDnode == NULL) break;
strncpy(pVgroup->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn)); strncpy(pVgroupInfo->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
pVgroup->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort); pVgroupInfo->vgroups[vgSize].ipAddr[vn].port = htons(tsDnodeShellPort);
pVgroup->vgroups[vg].numOfIps++; pVgroupInfo->vgroups[vgSize].numOfIps++;
} }
mgmtDecVgroupRef(vgItem); vgSize++;
mgmtDecVgroupRef(pVgroup);
} }
pVgroup->numOfVgroups = htonl(vg); pVgroupInfo->numOfVgroups = htonl(vgSize);
// one table is done, try the next table // one table is done, try the next table
msg += sizeof(SVgroupsInfo) + vg * sizeof(SCMVgroupInfo); msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo);
} }
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};

View File

@ -55,9 +55,8 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *); void (*cfp)(SRpcMsg *, SRpcIpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void *tmrCtrl; // handle to timer
@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) {
if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->secret) strcpy(pRpc->secret, pInit->secret);
if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey);
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
pRpc->ufp = pInit->ufp;
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
} else { } else {
// for asynchronous API // for asynchronous API
if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) SRpcIpSet *pIpSet = NULL;
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)
pIpSet = &pContext->ipSet;
(*pRpc->cfp)(pMsg); (*pRpc->cfp)(pMsg, pIpSet);
} }
// free the request message // free the request message
@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg, NULL);
} else { } else {
// it's a response // it's a response
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;

View File

@ -31,22 +31,16 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
static void processResponse(SRpcMsg *pMsg) { static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)pMsg->handle; SInfo *pInfo = (SInfo *)pMsg->handle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
if (pIpSet) pInfo->ipSet = *pIpSet;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem); sem_post(&pInfo->rspSem);
} }
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0; static int tcount = 0;
static void *sendRequest(void *param) { static void *sendRequest(void *param) {
@ -99,7 +93,6 @@ int main(int argc, char *argv[]) {
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse; rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";

View File

@ -32,12 +32,6 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0; static int tcount = 0;
static int terror = 0; static int terror = 0;
@ -100,8 +94,6 @@ int main(int argc, char *argv[]) {
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
// rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";

View File

@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
return ret; return ret;
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));

View File

@ -1,6 +1,6 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
$totalVnodes = 100 $totalVnodes = 10
$maxTables = 4 $maxTables = 4
$totalRows = $totalVnodes * $maxTables $totalRows = $totalVnodes * $maxTables

View File

@ -96,25 +96,25 @@ $ts1 = $ts0 + 1000
$ts2 = $ts0 + 2000 $ts2 = $ts0 + 2000
sql insert into tb_1 using $stb tags (-1) values ( $ts1 , 1,1,1,1,'bin',1,1,1,'涛思数据') ( $ts2 , 2,2,2,2,'binar', 1,1,1,'nchar') sql insert into tb_1 using $stb tags (-1) values ( $ts1 , 1,1,1,1,'bin',1,1,1,'涛思数据') ( $ts2 , 2,2,2,2,'binar', 1,1,1,'nchar')
sql select * from $stb sql select * from $stb
if $rows != 3 then if $rows != 5 then
return -1 return -1
endi endi
if $data19 != 涛思数据 then if $data09 != 涛思数据 then
return -1 return -1
endi endi
if $data11 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data22 != 2 then if $data42 != 2 then
return -2 return -2
endi endi
if $data23 != 2.00000 then if $data43 != 2.00000 then
return -1 return -1
endi endi
if $data25 != binar then if $data45 != binar then
return -1 return -1
endi endi
if $data29 != nchar then if $data49 != nchar then
return -1 return -1
endi endi
sql drop table tb_1 sql drop table tb_1
@ -127,22 +127,22 @@ sql select * from $stb
if $rows != 5 then if $rows != 5 then
return -1 return -1
endi endi
if $data19 != 涛思数据 then if $data09 != 涛思数据 then
return -1 return -1
endi endi
if $data11 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data22 != 2 then if $data42 != 2 then
return -2 return -2
endi endi
if $data23 != 2.00000 then if $data43 != 2.00000 then
return -1 return -1
endi endi
if $data25 != binar then if $data45 != binar then
return -1 return -1
endi endi
if $data29 != nchar then if $data49 != nchar then
return -1 return -1
endi endi
@ -154,13 +154,13 @@ sql show tables
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
if $data00 != tb3 then if $data00 != tb1 then
return -1 return -1
endi endi
if $data10 != tb2 then if $data10 != tb2 then
return -1 return -1
endi endi
if $data20 != tb1 then if $data20 != tb3 then
return -1 return -1
endi endi

View File

@ -42,7 +42,7 @@ sql select count(*), last(ts), min(k), max(k), avg(k) from db.mt where a=0 and t
print =================== step2 print =================== step2
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 10000 sleep 5000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 3000 sleep 3000

View File

@ -1,9 +1,6 @@
cd ../../debug; cmake .. cd ../../debug; cmake ..
#cd ../../debug; make clean
cd ../../debug; make cd ../../debug; make
cd ../../../debug; cmake .. cd ../../../debug; cmake ..
#cd ../../../debug; make clean
cd ../../../debug; make cd ../../../debug; make
#./test.sh -f general/alter/cached_schema_after_alter.sim #./test.sh -f general/alter/cached_schema_after_alter.sim
@ -159,9 +156,7 @@ cd ../../../debug; make
#./test.sh -f general/stable/disk.sim #./test.sh -f general/stable/disk.sim
#./test.sh -f general/stable/metrics.sim #./test.sh -f general/stable/metrics.sim
#./test.sh -f general/stable/values.sim #./test.sh -f general/stable/values.sim
#./test.sh -f general/stable/vnode3.sim ./test.sh -f general/stable/vnode3.sim
#stream
./test.sh -f general/table/autocreate.sim ./test.sh -f general/table/autocreate.sim
./test.sh -f general/table/basic1.sim ./test.sh -f general/table/basic1.sim
@ -176,12 +171,12 @@ cd ../../../debug; make
./test.sh -f general/table/column2.sim ./test.sh -f general/table/column2.sim
./test.sh -f general/table/date.sim ./test.sh -f general/table/date.sim
./test.sh -f general/table/db.table.sim ./test.sh -f general/table/db.table.sim
#./test.sh -f general/table/delete_reuse1.sim ./test.sh -f general/table/delete_reuse1.sim
#./test.sh -f general/table/delete_reuse2.sim ./test.sh -f general/table/delete_reuse2.sim
#./test.sh -f general/table/delete_writing.sim #hongze ./test.sh -f general/table/delete_writing.sim
#./test.sh -f general/table/describe.sim ./test.sh -f general/table/describe.sim
./test.sh -f general/table/double.sim ./test.sh -f general/table/double.sim
#./test.sh -f general/table/fill.sim ./test.sh -f general/table/fill.sim
./test.sh -f general/table/float.sim ./test.sh -f general/table/float.sim
./test.sh -f general/table/int.sim ./test.sh -f general/table/int.sim
./test.sh -f general/table/limit.sim ./test.sh -f general/table/limit.sim
@ -236,8 +231,6 @@ cd ../../../debug; make
./test.sh -f general/vector/table_query.sim ./test.sh -f general/vector/table_query.sim
./test.sh -f general/vector/table_time.sim ./test.sh -f general/vector/table_time.sim
#################################
./test.sh -u -f unique/account/account_create.sim ./test.sh -u -f unique/account/account_create.sim
./test.sh -u -f unique/account/account_delete.sim ./test.sh -u -f unique/account/account_delete.sim
./test.sh -u -f unique/account/account_len.sim ./test.sh -u -f unique/account/account_len.sim
@ -251,7 +244,7 @@ cd ../../../debug; make
./test.sh -u -f unique/account/user_len.sim ./test.sh -u -f unique/account/user_len.sim
#./test.sh -u -f unique/big/balance.sim #./test.sh -u -f unique/big/balance.sim
#./test.sh -u -f unique/big/maxvnodes.sim #slguan ./test.sh -u -f unique/big/maxvnodes.sim
./test.sh -u -f unique/big/tcp.sim ./test.sh -u -f unique/big/tcp.sim
##./test.sh -u -f unique/cluster/balance1.sim ##./test.sh -u -f unique/cluster/balance1.sim
@ -307,8 +300,6 @@ cd ../../../debug; make
#./test.sh -u -f unique/mnode/mgmtr2.sim #./test.sh -u -f unique/mnode/mgmtr2.sim
#./test.sh -u -f unique/mnode/secondIp.sim #./test.sh -u -f unique/mnode/secondIp.sim
#stream
##./test.sh -u -f unique/table/delete_part.sim ##./test.sh -u -f unique/table/delete_part.sim
#./test.sh -u -f unique/vnode/commit.sim #./test.sh -u -f unique/vnode/commit.sim

View File

@ -8,7 +8,7 @@ system sh/cfg.sh -n dnode2 -c numOfMPeers -v 2
system sh/cfg.sh -n dnode3 -c numOfMPeers -v 2 system sh/cfg.sh -n dnode3 -c numOfMPeers -v 2
print ============== step1 print ============== step1
system sh/exec_up.sh -n dnode1 -s start system sh/exec_up.sh -n dnode1 -s start -t
sleep 3000 sleep 3000
sql connect sql connect
@ -20,7 +20,7 @@ if $data2_1 != master then
endi endi
print ============== step2 print ============== step2
system sh/exec_up.sh -n dnode2 -s start system sh/exec_up.sh -n dnode2 -s start -t
sql create dnode $hostname2 sql create dnode $hostname2
$x = 0 $x = 0
@ -41,6 +41,17 @@ if $data2_2 != slave then
goto show2 goto show2
endi endi
system sh/exec_up.sh -n dnode1 -s stop -x SIGINT
system sh/exec_up.sh -n dnode2 -s stop -x SIGINT
system sh/exec_up.sh -n dnode3 -s stop -x SIGINT
system sh/exec_up.sh -n dnode4 -s stop -x SIGINT
system sh/exec_up.sh -n dnode5 -s stop -x SIGINT
system sh/exec_up.sh -n dnode6 -s stop -x SIGINT
system sh/exec_up.sh -n dnode7 -s stop -x SIGINT
system sh/exec_up.sh -n dnode8 -s stop -x SIGINT
return
print ============== step3 print ============== step3
sql_error drop dnode $hostname1 -x error1 sql_error drop dnode $hostname1 -x error1
print should not drop master print should not drop master