diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4630ea48c9..f439d26606 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -427,7 +427,7 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - // 0-any; 1-mnode; 2-dnode + // 0-any; 1-mnode; 2-vnode cfg.option = "alternativeRole"; cfg.ptr = &tsAlternativeRole; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 6e81db7db7..9406a2fdce 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -15,17 +15,19 @@ #define _DEFAULT_SOURCE +#include +#include #include #include -#include -#include + +#include "taos.h" #include "taosdef.h" #include "taosmsg.h" +#include "tcq.h" +#include "tdataformat.h" #include "tglobal.h" #include "tlog.h" #include "twal.h" -#include "tcq.h" -#include "taos.h" #define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }} #define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }} @@ -46,15 +48,14 @@ typedef struct { } SCqContext; typedef struct SCqObj { - int tid; // table ID - int rowSize; // bytes of a row - char *sqlStr; // SQL string - int columns; // number of columns - SSchema *pSchema; // pointer to schema array - void *pStream; - struct SCqObj *prev; - struct SCqObj *next; - SCqContext *pContext; + int tid; // table ID + int rowSize; // bytes of a row + char * sqlStr; // SQL string + STSchema * pSchema; // pointer to schema array + void * pStream; + struct SCqObj *prev; + struct SCqObj *next; + SCqContext * pContext; } SCqObj; int cqDebugFlag = 135; @@ -152,7 +153,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { +void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); @@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column pObj->sqlStr = malloc(strlen(sqlStr)+1); strcpy(pObj->sqlStr, sqlStr); - pObj->columns = columns; - - int size = sizeof(SSchema) * columns; - pObj->pSchema = malloc(size); - memcpy(pObj->pSchema, pSchema, size); + pObj->pSchema = tdDupSchema(pSchema); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); diff --git a/src/cq/test/cqtest.c b/src/cq/test/cqtest.c index 7977bd85bc..3aa649ee34 100644 --- a/src/cq/test/cqtest.c +++ b/src/cq/test/cqtest.c @@ -59,21 +59,16 @@ int main(int argc, char *argv[]) { exit(-1); } - SSchema schema[2]; - schema[0].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(schema[0].name, "ts"); - schema[0].colId = 0; - schema[0].bytes = 8; - - schema[1].type = TSDB_DATA_TYPE_INT; - strcpy(schema[1].name, "avgspeed"); - schema[1].colId = 1; - schema[1].bytes = 4; + STSchema *pSchema = tdNewSchema(2); + tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8); + tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4); for (int sid =1; sid<10; ++sid) { - cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2); + cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema); } + tdFreeSchema(pSchema); + while (1) { char c = getchar(); diff --git a/src/inc/tcq.h b/src/inc/tcq.h index d0a2097c05..e025afaa0a 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -19,6 +19,7 @@ extern "C" { #endif +#include "tdataformat.h" typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); @@ -40,7 +41,7 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); +void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 341dee1476..c758d3aea4 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -43,6 +43,8 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status); int (*eventCallBack)(void *); + void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema); + void (*cqDropFunc)(void *handle); } STsdbAppH; // --------- TSDB REPOSITORY CONFIGURATION DEFINITION @@ -71,7 +73,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int32_t tsdbDropRepo(TsdbRepoT *repo); -TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); +TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit); int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); diff --git a/src/mnode/inc/mnodeDnode.h b/src/mnode/inc/mnodeDnode.h index 2b12a29390..f95a163d83 100644 --- a/src/mnode/inc/mnodeDnode.h +++ b/src/mnode/inc/mnodeDnode.h @@ -27,6 +27,12 @@ typedef enum { TAOS_DN_STATUS_READY } EDnodeStatus; +typedef enum { + TAOS_DN_ALTERNATIVE_ROLE_ANY, + TAOS_DN_ALTERNATIVE_ROLE_MNODE, + TAOS_DN_ALTERNATIVE_ROLE_VNODE +} EDnodeAlternativeRole; + int32_t mnodeInitDnodes(); void mnodeCleanupDnodes(); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 5aaa2049f1..506511ece9 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -58,6 +58,7 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole); static int32_t mnodeDnodeActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); @@ -521,6 +522,12 @@ static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "alternativeRole"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "create_time"); @@ -572,12 +579,16 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo *(int16_t *)pWrite = pDnode->totalVnodes; cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; char* status = mnodeGetDnodeStatusStr(pDnode->status); STR_TO_VARSTR(pWrite, status); cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + char* role = mnodeGetDnodeAlternativeRoleStr(pDnode->alternativeRole); + STR_TO_VARSTR(pWrite, role); + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int64_t *)pWrite = pDnode->createdTime; cols++; @@ -895,3 +906,13 @@ char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) { default: return "undefined"; } } + +static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) { + switch (alternativeRole) { + case TAOS_DN_ALTERNATIVE_ROLE_ANY: return "any"; + case TAOS_DN_ALTERNATIVE_ROLE_MNODE: return "mnode"; + case TAOS_DN_ALTERNATIVE_ROLE_VNODE: return "vnode"; + default:return "any"; + } +} + diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 0973163cf9..b126e734d1 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -116,12 +116,6 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { return TSDB_CODE_OPS_NOT_SUPPORT; } - int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; - SCMShowRsp *pShowRsp = rpcMallocCont(size); - if (pShowRsp == NULL) { - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); pShow->signature = pShow; @@ -131,7 +125,13 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); pShow = mnodeSaveShowObj(pShow, showObjSize); - if (pShow == NULL) { + if (pShow == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + SCMShowRsp *pShowRsp = rpcMallocCont(size); + if (pShowRsp == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } pShowRsp->qhandle = htobe64((uint64_t) pShow); @@ -144,6 +144,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } else { mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); + rpcFreeCont(pShowRsp); mnodeCleanupShowObj(pShow, true); return code; } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 0839e0f8ff..9dd5136c95 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -87,6 +87,7 @@ typedef struct STable { struct STable *prev; tstr * name; // NOTE: there a flexible string here char * sql; + void * cqhandle; } STable; #define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey) @@ -110,6 +111,7 @@ typedef struct { SMetaFile *mfh; // meta file handle int maxRowBytes; int maxCols; + void * pRepo; } STsdbMeta; // element put in skiplist for each table @@ -118,7 +120,7 @@ typedef struct STableIndexElem { STable* pTable; } STableIndexElem; -STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables); +STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo); int32_t tsdbFreeMeta(STsdbMeta *pMeta); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index bddb3fcaff..9c8e57d18a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -189,9 +189,9 @@ _err: * * @return a TSDB repository handle on success, NULL for failure and the error number is set */ -TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { +TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { char dataDir[128] = "\0"; - if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { + if (access(rootDir, F_OK | W_OK | R_OK) < 0) { return NULL; } @@ -200,12 +200,12 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { return NULL; } - pRepo->rootDir = strdup(tsdbDir); + pRepo->rootDir = strdup(rootDir); tsdbRestoreCfg(pRepo, &(pRepo->config)); if (pAppH) pRepo->appH = *pAppH; - pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); + pRepo->tsdbMeta = tsdbInitMeta(rootDir, pRepo->config.maxTables, pRepo); if (pRepo->tsdbMeta == NULL) { free(pRepo->rootDir); free(pRepo); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e320de9827..0d9e6a9cf8 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { void tsdbOrgMeta(void *pHandle) { STsdbMeta *pMeta = (STsdbMeta *)pHandle; + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; @@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) { tsdbAddTableIntoIndex(pMeta, pTable); } } + + for (int i = 0; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); + } + } } /** * Initialize the meta handle * ASSUMPTIONS: VALID PARAMETER */ -STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { +STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo) { STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); if (pMeta == NULL) return NULL; @@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); pMeta->maxRowBytes = 0; pMeta->maxCols = 0; + pMeta->pRepo = pRepo; if (pMeta->tables == NULL) { free(pMeta); return NULL; @@ -189,13 +198,16 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { } int32_t tsdbFreeMeta(STsdbMeta *pMeta) { + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; if (pMeta == NULL) return 0; tsdbCloseMetaFile(pMeta->mfh); for (int i = 1; i < pMeta->maxTables; i++) { if (pMeta->tables[i] != NULL) { - tsdbFreeTable(pMeta->tables[i]); + STable *pTable = pMeta->tables[i]; + if (pTable->type == TSDB_STREAM_TABLE) (*pRepo->appH.cqDropFunc)(pTable->cqhandle); + tsdbFreeTable(pTable); } } @@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { } static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { + STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; if (pTable->type == TSDB_SUPER_TABLE) { // add super table to the linked list if (pMeta->superList == NULL) { @@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { tsdbAddTableIntoIndex(pMeta, pTable); } if (pTable->type == TSDB_STREAM_TABLE && addIdx) { - // TODO + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); } pMeta->nTables++; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b25180f0f0..cc92c03389 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; + appH.cqCreateFunc = cqCreate; + appH.cqDropFunc = cqDrop; sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { @@ -391,14 +393,14 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - if (pVnode->wal) - walClose(pVnode->wal); - pVnode->wal = NULL; - if (pVnode->tsdb) tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; + if (pVnode->wal) + walClose(pVnode->wal); + pVnode->wal = NULL; + if (pVnode->cq) cqClose(pVnode->cq); pVnode->cq = NULL; @@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; + appH.cqCreateFunc = cqCreate; + appH.cqDropFunc = cqDrop; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index dc8c564fd9..eb392bb00b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -269,6 +269,7 @@ cd ../../../debug; make ./test.sh -u -f unique/db/replica_reduce31.sim ./test.sh -u -f unique/db/replica_part.sim +./test.sh -u -f unique/dnode/alternativeRole.sim ./test.sh -u -f unique/dnode/balance1.sim ./test.sh -u -f unique/dnode/balance2.sim ./test.sh -u -f unique/dnode/balance3.sim diff --git a/tests/script/unique/dnode/alternativeRole.sim b/tests/script/unique/dnode/alternativeRole.sim new file mode 100644 index 0000000000..c9815fdf39 --- /dev/null +++ b/tests/script/unique/dnode/alternativeRole.sim @@ -0,0 +1,91 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c alternativeRole -v 1 +system sh/cfg.sh -n dnode2 -c alternativeRole -v 2 +system sh/cfg.sh -n dnode3 -c alternativeRole -v 0 + +system sh/cfg.sh -n dnode1 -c wallevel -v 1 +system sh/cfg.sh -n dnode2 -c wallevel -v 1 +system sh/cfg.sh -n dnode3 -c wallevel -v 1 + +system sh/cfg.sh -n dnode1 -c numOfMpeers -v 3 +system sh/cfg.sh -n dnode2 -c numOfMpeers -v 3 +system sh/cfg.sh -n dnode3 -c numOfMpeers -v 3 + +print ========== step1 +system sh/exec_up.sh -n dnode1 -s start +sql connect +sql create dnode $hostname2 +system sh/exec_up.sh -n dnode2 -s start +sleep 3000 +sql create dnode $hostname3 +system sh/exec_up.sh -n dnode3 -s start +sleep 3000 + +sql show dnodes +print dnode1 $data5_1 +print dnode1 $data5_2 +print dnode1 $data5_3 + +if $data5_1 != mnode then + return -1 +endi +if $data5_2 != vnode then + return -1 +endi +if $data5_3 != any then + return -1 +endi + +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +if $data2_1 != master then + return -1 +endi +if $data2_2 != null then + return -1 +endi +if $data2_3 != slave then + return -1 +endi + +print ========== step2 +sql create database d1 maxTables 4 +sql create table d1.t1 (ts timestamp, i int) +sql create table d1.t2 (ts timestamp, i int) +sql create table d1.t3 (ts timestamp, i int) +sql create table d1.t4 (ts timestamp, i int) +sql create table d1.t5 (ts timestamp, i int) +sql create table d1.t6 (ts timestamp, i int) +sql create table d1.t7 (ts timestamp, i int) +sql create table d1.t8 (ts timestamp, i int) + +sql show dnodes +print dnode1 $data2_1 +print dnode2 $data2_2 +print dnode3 $data2_3 + +if $data2_1 != 0 then + return -1 +endi +if $data2_2 != 1 then + return -1 +endi +if $data2_3 != 1 then + return -1 +endi + +system sh/exec_up.sh -n dnode1 -s stop -x SIGINT +system sh/exec_up.sh -n dnode2 -s stop -x SIGINT +system sh/exec_up.sh -n dnode3 -s stop -x SIGINT +system sh/exec_up.sh -n dnode4 -s stop -x SIGINT +system sh/exec_up.sh -n dnode5 -s stop -x SIGINT +system sh/exec_up.sh -n dnode6 -s stop -x SIGINT +system sh/exec_up.sh -n dnode7 -s stop -x SIGINT +system sh/exec_up.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file