diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 4bd89e238e..a81f8c0c9d 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -28,7 +28,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ENDIF () IF (TD_VPEER) - TARGET_LINK_LIBRARIES(taosd balance) + TARGET_LINK_LIBRARIES(taosd balance sync) ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index fa3ad946e3..da80206e4c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -131,7 +131,7 @@ static int32_t dnodeOpenVnodes() { char vnodeDir[TSDB_FILENAME_LEN * 3]; int32_t failed = 0; - int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000); + int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList); for (int32_t i = 0; i < numOfVnodes; ++i) { @@ -146,7 +146,7 @@ static int32_t dnodeOpenVnodes() { } static void dnodeCloseVnodes() { - int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000); + int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList); for (int32_t i = 0; i < numOfVnodes; ++i) { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index fa5e3a3b3d..db39906c68 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -42,7 +42,7 @@ void *dnodeAllocateWqueue(void *pVnode); void dnodeFreeWqueue(void *queue); void *dnodeAllocateRqueue(void *pVnode); void dnodeFreeRqueue(void *rqueue); -void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); +void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 555e0503da..ea20ceec79 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -66,11 +66,11 @@ typedef struct { // if name is null, get the file from index or after, used by master // if name is provided, get the named file at the specified index, used by unsynced node // it returns the file magic number and size, if file not there, magic shall be 0. - uint32_t (*getFileInfo)(char *name, int *index, int *size); + uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file - int (*getWalInfo)(char *name, int *index); + int (*getWalInfo)(void *ahandle, char *name, uint32_t *index); // when a forward pkt is received, call this to handle data int (*writeToCache)(void *ahandle, void *pHead, int type); @@ -94,11 +94,13 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole *); extern char *syncRole[]; +//global configurable parameters extern int tsMaxSyncNum; extern int tsSyncTcpThreads; extern int tsMaxWatchFiles; extern short tsSyncPort; extern int tsMaxFwdInfo; +extern int sDebugFlag; #ifdef __cplusplus } diff --git a/src/inc/twal.h b/src/inc/twal.h index 3648f5ae29..53d4f835b0 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -40,12 +40,12 @@ typedef struct { typedef void* twal_h; // WAL HANDLE -twal_h walOpen(char *path, int max, int level); +twal_h walOpen(char *path, SWalCfg *pCfg); void walClose(twal_h); int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); -int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type)); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type)); int walGetWalFile(twal_h, char *name, uint32_t *index); extern int wDebugFlag; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 31426fc53c..f50d3e0698 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -935,6 +935,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); + } else if (pHead->code == TSDB_CODE_NOT_READY) { + pConn->pContext->code = pHead->code; + rpcProcessConnError(pConn->pContext, NULL); } else { rpcNotifyClient(pContext, &rpcMsg); } @@ -1079,7 +1082,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, - pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); diff --git a/src/util/src/tstring.c b/src/util/src/tstring.c index 7aca939f47..5a134df129 100644 --- a/src/util/src/tstring.c +++ b/src/util/src/tstring.c @@ -110,6 +110,8 @@ char *taosMsg[] = { "", "", "", + "", + "", "", //90 "config-table", diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 96526d7209..f1b9d55d54 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -27,7 +27,7 @@ typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count int status; - int role; + int8_t role; int64_t version; void *wqueue; void *rqueue; @@ -41,7 +41,7 @@ typedef struct { SWalCfg walCfg; } SVnodeObj; -int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); +int vnodeWriteToQueue(void *param, void *pHead, int type); void vnodeInitWriteFp(void); void vnodeInitReadFp(void); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 56d079bf4b..987fd69a70 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -28,14 +28,17 @@ #include "vnode.h" #include "vnodeInt.h" -static void *tsDnodeVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); -static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWALCallback(void *arg); -static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); -static int32_t vnodeReadCfg(SVnodeObj *pVnode); +static void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); +static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWalCallback(void *arg); +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); +static int32_t vnodeReadCfg(SVnodeObj *pVnode); +static int vnodeWalCallback(void *arg); +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); +static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); +static void vnodeNotifyRole(void *ahandle, int8_t role); -static int32_t tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static void vnodeInit() { @@ -138,14 +141,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog); - pVnode->sync = NULL; + pVnode->wal = walOpen(temp, &pVnode->walCfg); + + SSyncInfo syncInfo; + syncInfo.vgId = pVnode->vgId; + syncInfo.version = pVnode->version; + syncInfo.syncCfg = pVnode->syncCfg; + sprintf(syncInfo.path, "%s/tsdb/", rootDir); + syncInfo.ahandle = pVnode; + syncInfo.getWalInfo = vnodeGetWalInfo; + syncInfo.getFileInfo = vnodeGetFileInfo; + syncInfo.writeToCache = vnodeWriteToQueue; + syncInfo.confirmForward = dnodeSendRpcWriteRsp; + syncInfo.notifyRole = vnodeNotifyRole; + pVnode->sync = syncStart(&syncInfo);; + pVnode->events = NULL; pVnode->cq = NULL; STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWALCallback; + appH.walCallBack = vnodeWalCallback; sprintf(temp, "%s/tsdb", rootDir); void *pTsdb = tsdbOpenRepo(temp, &appH); @@ -162,7 +178,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = TAOS_VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); - tsOpennedVnodes++; + atomic_add_fetch_32(&tsOpennedVnodes, 1); return TSDB_CODE_SUCCESS; } @@ -203,8 +219,8 @@ void vnodeRelease(void *pVnodeRaw) { dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); free(pVnode); - tsOpennedVnodes--; - if (tsOpennedVnodes <= 0) { + int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1); + if (count <= 0) { taosCleanUpIntHash(tsDnodeVnodesHash); vnodeModuleInit = PTHREAD_ONCE_INIT; tsDnodeVnodesHash = NULL; @@ -280,11 +296,27 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // TODO: this is a simple implement -static int vnodeWALCallback(void *arg) { +static int vnodeWalCallback(void *arg) { SVnodeObj *pVnode = arg; return walRenew(pVnode->wal); } +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { + // SVnodeObj *pVnode = ahandle; + //tsdbGetFileInfo(pVnode->tsdb, name, index, size); + return 0; +} + +static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { + SVnodeObj *pVnode = ahandle; + return walGetWalFile(pVnode->wal, name, index); +} + +static void vnodeNotifyRole(void *ahandle, int8_t role) { + SVnodeObj *pVnode = ahandle; + pVnode->role = role; +} + static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); @@ -335,7 +367,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->syncCfg.arbitratorIp = arbitratorIp; + pVnode->syncCfg.arbitratorIp = 0; int32_t quorum = -1; num = fscanf(fp, "%s %d", option[0], &quorum); diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 249836ddd1..5e03305487 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -78,11 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); if (code < 0) return code; -/* forward - if (pVnode->replica > 1 && pVnode->role == TAOS_SYNC_ROLE_MASTER) { + if (pVnode->syncCfg.replica > 1) code = syncForwardToPeer(pVnode->sync, pHead, item); - } -*/ return code; } @@ -252,8 +249,9 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet return code; } -int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { +int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; + SWalHead *pHead = data; int size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 504e370279..1738967098 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -49,18 +49,18 @@ int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; static int walHandleExistingFiles(char *path); -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)); static int walRemoveWalFiles(char *path); -void *walOpen(char *path, int max, int level) { +void *walOpen(char *path, SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); if (pWal == NULL) return NULL; pWal->fd = -1; - pWal->max = max; + pWal->max = pCfg->wals; pWal->id = 0; pWal->num = 0; - pWal->level = level; + pWal->level = pCfg->commitLog; strcpy(pWal->path, path); pthread_mutex_init(&pWal->mutex, NULL); @@ -170,7 +170,7 @@ void walFsync(void *handle) { fsync(pWal->fd); } -int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { SWal *pWal = (SWal *)handle; int code = 0; struct dirent *ent; @@ -247,7 +247,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) { int code = 0; char *buffer = malloc(1024000); // size for one record diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index 37d1d8e84c..8e10bc11e5 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -21,8 +21,10 @@ int64_t ver = 0; void *pWal = NULL; -int writeToQueue(void *pVnode, SWalHead *pHead, int type) { +int writeToQueue(void *pVnode, void *data, int type) { // do nothing + SWalHead *pHead = data; + if (pHead->version > ver) ver = pHead->version; @@ -74,7 +76,11 @@ int main(int argc, char *argv[]) { taosInitLog("wal.log", 100000, 10); - pWal = walOpen(path, max, level); + SWalCfg walCfg; + walCfg.commitLog = level; + walCfg.wals = max; + + pWal = walOpen(path, &walCfg); if (pWal == NULL) { printf("failed to open wal\n"); exit(-1);