Merge pull request #4177 from taosdata/feature/wal

[TD-2944]<fix>:Inaccurate error code when the SQL in restful interface fails
This commit is contained in:
Shengliang Guan 2020-11-10 22:48:35 +08:00 committed by GitHub
commit 545a31f4bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 264 additions and 254 deletions

View File

@ -51,9 +51,9 @@ typedef struct {
} SSyncCfg; } SSyncCfg;
typedef struct { typedef struct {
int selfIndex; int32_t selfIndex;
uint32_t nodeId[TAOS_SYNC_MAX_REPLICA]; uint32_t nodeId[TAOS_SYNC_MAX_REPLICA];
int role[TAOS_SYNC_MAX_REPLICA]; int32_t role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole; } SNodesRole;
/* /*
@ -83,25 +83,24 @@ typedef void (*FNotifyRole)(void *ahandle, int8_t role);
typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef int32_t (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
uint64_t version; // initial version uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt SSyncCfg syncCfg; // configuration from mgmt
char path[128]; // path to the file char path[128]; // path to the file
void * ahandle; // handle provided by APP
void *ahandle; // handle provided by APP FGetFileInfo getFileInfo;
FGetFileInfo getFileInfo; FGetWalInfo getWalInfo;
FGetWalInfo getWalInfo; FWriteToCache writeToCache;
FWriteToCache writeToCache; FConfirmForward confirmForward;
FConfirmForward confirmForward; FNotifyRole notifyRole;
FNotifyRole notifyRole; FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced; FNotifyFileSynced notifyFileSynced;
} SSyncInfo; } SSyncInfo;
typedef void* tsync_h; typedef void *tsync_h;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
@ -109,22 +108,22 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo *); int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *); int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype); int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
void syncRecover(int64_t rid); // recover from other nodes: void syncRecover(int64_t rid); // recover from other nodes:
int syncGetNodesRole(int64_t rid, SNodesRole *); int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
//global configurable parameters //global configurable parameters
extern int tsMaxSyncNum; extern int32_t tsMaxSyncNum;
extern int tsSyncTcpThreads; extern int32_t tsSyncTcpThreads;
extern int tsMaxWatchFiles; extern int32_t tsMaxWatchFiles;
extern int tsSyncTimer; extern int32_t tsSyncTimer;
extern int tsMaxFwdInfo; extern int32_t tsMaxFwdInfo;
extern int sDebugFlag; extern int32_t sDebugFlag;
extern char tsArbitrator[]; extern char tsArbitrator[];
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,9 +22,11 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
typedef void (*FHttpResultFp)(void *param, void *result, int32_t code, int32_t rows);
bool httpInitResultQueue(); bool httpInitResultQueue();
void httpCleanupResultQueue(); void httpCleanupResultQueue();
void httpDispatchToResultQueue(); void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -25,6 +25,7 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpAuth.h" #include "httpAuth.h"
#include "httpSession.h" #include "httpSession.h"
#include "httpQueue.h"
typedef struct { typedef struct {
pthread_t thread; pthread_t thread;
@ -37,33 +38,35 @@ typedef struct {
} SHttpWorkerPool; } SHttpWorkerPool;
typedef struct { typedef struct {
void *param; void * param;
void *result; void * result;
int32_t numOfRows; int32_t code;
void (*fp)(void *param, void *result, int32_t numOfRows); int32_t rows;
FHttpResultFp fp;
} SHttpResult; } SHttpResult;
static SHttpWorkerPool tsHttpPool; static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset; static taos_qset tsHttpQset;
static taos_queue tsHttpQueue; static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t numOfRows, void (*fp)(void *param, void *result, int32_t numOfRows)) { void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) {
if (tsHttpQueue != NULL) { if (tsHttpQueue != NULL) {
SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult)); SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult));
pMsg->param = param; pMsg->param = param;
pMsg->result = result; pMsg->result = result;
pMsg->numOfRows = numOfRows; pMsg->code = code;
pMsg->rows = rows;
pMsg->fp = fp; pMsg->fp = fp;
taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg); taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg);
} else { } else {
(*fp)(param, result, numOfRows); (*fp)(param, result, code, rows);
} }
} }
static void *httpProcessResultQueue(void *param) { static void *httpProcessResultQueue(void *param) {
SHttpResult *pMsg; SHttpResult *pMsg;
int32_t type; int32_t type;
void *unUsed; void * unUsed;
while (1) { while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
@ -71,8 +74,9 @@ static void *httpProcessResultQueue(void *param) {
break; break;
} }
httpTrace("context:%p, res:%p will be processed in result queue", pMsg->param, pMsg->result); httpTrace("context:%p, res:%p will be processed in result queue, code:%d rows:%d", pMsg->param, pMsg->result,
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->numOfRows); pMsg->code, pMsg->rows);
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->code, pMsg->rows);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }

View File

@ -29,9 +29,9 @@
void httpProcessMultiSql(HttpContext *pContext); void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows);
void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
@ -43,7 +43,7 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
bool isContinue = false; bool isContinue = false;
if (numOfRows > 0) { if (code == TSDB_CODE_SUCCESS && numOfRows > 0) {
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->buildQueryJsonFp) { if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->buildQueryJsonFp) {
isContinue = (encode->buildQueryJsonFp)(pContext, singleCmd, result, numOfRows); isContinue = (encode->buildQueryJsonFp)(pContext, singleCmd, result, numOfRows);
} }
@ -58,9 +58,9 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, process pos:%d, stop retrieve, numOfRows:%d, sql:%s", pContext, pContext->fd,
pContext->user, multiCmds->pos, numOfRows, sql); pContext->user, multiCmds->pos, numOfRows, sql);
if (numOfRows < 0) { if (code < 0) {
httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, process pos:%d, retrieve failed code:%s, sql:%s", pContext, pContext->fd,
pContext->user, multiCmds->pos, tstrerror(numOfRows), sql); pContext->user, multiCmds->pos, tstrerror(code), sql);
} }
taos_free_result(result); taos_free_result(result);
@ -73,15 +73,15 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
} }
} }
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessMultiSqlRetrieveCallBackImp); int32_t code = taos_errno(result);
httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessMultiSqlRetrieveCallBackImp);
} }
void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) { void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t affectRowsInput) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
code = taos_errno(result);
HttpSqlCmds *multiCmds = pContext->multiCmds; HttpSqlCmds *multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
@ -94,7 +94,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
return; return;
} }
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) { if (encode->checkFinishedFp != NULL && !encode->checkFinishedFp(pContext, singleCmd, code)) {
singleCmd->code = code; singleCmd->code = code;
httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, process pos jump to:%d, last code:%s, last sql:%s", pContext, pContext->fd,
@ -119,7 +119,7 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
bool isUpdate = tscIsUpdateQuery(result); bool isUpdate = tscIsUpdateQuery(result);
if (isUpdate) { if (isUpdate) {
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int32_t affectRows = taos_affected_rows(result);
httpDebug("context:%p, fd:%d, user:%s, process pos:%d, affect rows:%d, sql:%s", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, process pos:%d, affect rows:%d, sql:%s", pContext, pContext->fd,
pContext->user, multiCmds->pos, affectRows, sql); pContext->user, multiCmds->pos, affectRows, sql);
@ -156,8 +156,10 @@ void httpProcessMultiSqlCallBackImp(void *param, TAOS_RES *result, int code) {
} }
} }
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int32_t unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessMultiSqlCallBackImp); int32_t code = taos_errno(result);
int32_t affectRows = taos_affected_rows(result);
httpDispatchToResultQueue(param, result, code, affectRows, httpProcessMultiSqlCallBackImp);
} }
void httpProcessMultiSql(HttpContext *pContext) { void httpProcessMultiSql(HttpContext *pContext) {
@ -202,9 +204,9 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
httpProcessMultiSql(pContext); httpProcessMultiSql(pContext);
} }
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows); void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows);
void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t numOfRows) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
@ -212,7 +214,7 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
bool isContinue = false; bool isContinue = false;
if (numOfRows > 0) { if (code == TSDB_CODE_SUCCESS && numOfRows > 0) {
if (encode->buildQueryJsonFp) { if (encode->buildQueryJsonFp) {
isContinue = (encode->buildQueryJsonFp)(pContext, &pContext->singleCmd, result, numOfRows); isContinue = (encode->buildQueryJsonFp)(pContext, &pContext->singleCmd, result, numOfRows);
} }
@ -227,9 +229,9 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user, httpDebug("context:%p, fd:%d, user:%s, stop retrieve, numOfRows:%d", pContext, pContext->fd, pContext->user,
numOfRows); numOfRows);
if (numOfRows < 0) { if (code < 0) {
httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user, httpError("context:%p, fd:%d, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->user,
tstrerror(numOfRows)); tstrerror(code));
} }
taos_free_result(result); taos_free_result(result);
@ -242,30 +244,30 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
} }
} }
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int32_t numOfRows) {
httpDispatchToResultQueue(param, result, numOfRows, httpProcessSingleSqlRetrieveCallBackImp); int32_t code = taos_errno(result);
httpDispatchToResultQueue(param, result, code, numOfRows, httpProcessSingleSqlRetrieveCallBackImp);
} }
void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code, int32_t affectRowsInput) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
int32_t code = taos_errno(result);
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), (SSqlObj *)result); pContext->user, tstrerror(code), result);
return; return;
} }
if (code < 0) { if (code != TSDB_CODE_SUCCESS) {
SSqlObj *pObj = (SSqlObj *)result; SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) { if (code == TSDB_CODE_TSC_INVALID_SQL) {
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, terrno = code;
pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload); httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd,
httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); pContext->user, tstrerror(code), pObj, taos_errstr(pObj));
httpSendTaosdInvalidSqlErrorResp(pContext, taos_errstr(pObj));
} else { } else {
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), pObj); pContext->user, tstrerror(code), pObj);
@ -278,7 +280,8 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
bool isUpdate = tscIsUpdateQuery(result); bool isUpdate = tscIsUpdateQuery(result);
if (isUpdate) { if (isUpdate) {
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int32_t affectRows = taos_affected_rows(result);
assert(affectRows == affectRowsInput);
httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd, httpDebug("context:%p, fd:%d, user:%s, affect rows:%d, stop query, sqlObj:%p", pContext, pContext->fd,
pContext->user, affectRows, result); pContext->user, affectRows, result);
@ -308,8 +311,10 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
} }
} }
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) { void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int32_t unUsedCode) {
httpDispatchToResultQueue(param, result, unUsedCode, httpProcessSingleSqlCallBackImp); int32_t code = taos_errno(result);
int32_t affectRows = taos_affected_rows(result);
httpDispatchToResultQueue(param, result, code, affectRows, httpProcessSingleSqlCallBackImp);
} }
void httpProcessSingleSqlCmd(HttpContext *pContext) { void httpProcessSingleSqlCmd(HttpContext *pContext) {
@ -373,7 +378,7 @@ void httpExecCmd(HttpContext *pContext) {
} }
} }
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) {
HttpContext *pContext = param; HttpContext *pContext = param;
taos_free_result(result); taos_free_result(result);

View File

@ -89,11 +89,11 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
typedef struct { typedef struct {
char *buffer; char * buffer;
int bufferSize; int32_t bufferSize;
char *offset; char * offset;
int forwards; int32_t forwards;
int code; int32_t code;
} SRecvBuffer; } SRecvBuffer;
typedef struct { typedef struct {
@ -107,10 +107,10 @@ typedef struct {
} SFwdInfo; } SFwdInfo;
typedef struct { typedef struct {
int first; int32_t first;
int last; int32_t last;
int fwds; // number of forwards int32_t fwds; // number of forwards
SFwdInfo fwdInfo[]; SFwdInfo fwdInfo[];
} SSyncFwds; } SSyncFwds;
typedef struct SsyncPeer { typedef struct SsyncPeer {
@ -123,15 +123,15 @@ typedef struct SsyncPeer {
int8_t sstatus; // sync status int8_t sstatus; // sync status
uint64_t version; uint64_t version;
uint64_t sversion; // track the peer version in retrieve process uint64_t sversion; // track the peer version in retrieve process
int syncFd; int32_t syncFd;
int peerFd; // forward FD int32_t peerFd; // forward FD
int numOfRetrieves; // number of retrieves tried int32_t numOfRetrieves; // number of retrieves tried
int fileChanged; // a flag to indicate file is changed during retrieving process int32_t fileChanged; // a flag to indicate file is changed during retrieving process
void * timer; void * timer;
void * pConn; void * pConn;
int notifyFd; int32_t notifyFd;
int watchNum; int32_t watchNum;
int * watchFd; int32_t *watchFd;
int8_t refCount; // reference count int8_t refCount; // reference count
struct SSyncNode *pSyncNode; struct SSyncNode *pSyncNode;
} SSyncPeer; } SSyncPeer;
@ -161,16 +161,16 @@ typedef struct SSyncNode {
} SSyncNode; } SSyncNode;
// sync module global // sync module global
extern int tsSyncNum; extern int32_t tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN]; extern char tsNodeFqdn[TSDB_FQDN_LEN];
void *syncRetrieveData(void *param); void *syncRetrieveData(void *param);
void *syncRestoreData(void *param); void *syncRestoreData(void *param);
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer); void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int syncDecPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -31,38 +31,38 @@
#include "syncInt.h" #include "syncInt.h"
// global configurable // global configurable
int tsMaxSyncNum = 2; int32_t tsMaxSyncNum = 2;
int tsSyncTcpThreads = 2; int32_t tsSyncTcpThreads = 2;
int tsMaxWatchFiles = 500; int32_t tsMaxWatchFiles = 500;
int tsMaxFwdInfo = 200; int32_t tsMaxFwdInfo = 200;
int tsSyncTimer = 1; int32_t tsSyncTimer = 1;
// module global, not configurable // module global, not configurable
int tsSyncNum; // number of sync in process in whole system int32_t tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN]; char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool; static ttpool_h tsTcpPool;
static void * syncTmrCtrl = NULL; static void * tsSyncTmrCtrl = NULL;
static void * vgIdHash; static void * tsVgIdHash;
static int tsSyncRefId = -1; static int32_t tsSyncRefId = -1;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId); static void syncCheckPeerConnection(void *param, void *tmrId);
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack);
static void syncProcessBrokenLink(void *param); static void syncProcessBrokenLink(void *param);
static int syncProcessPeerMsg(void *param, void *buffer); static int32_t syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode); static void syncAddArbitrator(SSyncNode *pNode);
static void syncFreeNode(void *); static void syncFreeNode(void *);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer); static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
char* syncRole[] = { char* syncRole[] = {
@ -90,21 +90,21 @@ int32_t syncInit() {
return -1; return -1;
} }
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (syncTmrCtrl == NULL) { if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl"); sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool); taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
return -1; return -1;
} }
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (vgIdHash == NULL) { if (tsVgIdHash == NULL) {
sError("failed to init vgIdHash"); sError("failed to init tsVgIdHash");
taosTmrCleanUp(syncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool); taosCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
syncTmrCtrl = NULL; tsSyncTmrCtrl = NULL;
return -1; return -1;
} }
@ -126,14 +126,14 @@ void syncCleanUp() {
tsTcpPool = NULL; tsTcpPool = NULL;
} }
if (syncTmrCtrl) { if (tsSyncTmrCtrl) {
taosTmrCleanUp(syncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
syncTmrCtrl = NULL; tsSyncTmrCtrl = NULL;
} }
if (vgIdHash) { if (tsVgIdHash) {
taosHashCleanup(vgIdHash); taosHashCleanup(tsVgIdHash);
vgIdHash = NULL; tsVgIdHash = NULL;
} }
taosCloseRef(tsSyncRefId); taosCloseRef(tsSyncRefId);
@ -176,7 +176,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return -1; return -1;
} }
for (int i = 0; i < pCfg->replica; ++i) { for (int32_t i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
@ -204,7 +204,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return -1; return -1;
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl);
if (pNode->pFwdTimer == NULL) { if (pNode->pFwdTimer == NULL) {
sError("vgId:%d, failed to allocate timer", pNode->vgId); sError("vgId:%d, failed to allocate timer", pNode->vgId);
syncStop(pNode->rid); syncStop(pNode->rid);
@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
} }
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); taosHashPut(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
if (pNode->notifyRole) { if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
@ -231,10 +231,10 @@ void syncStop(int64_t rid) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
} }
@ -249,7 +249,7 @@ void syncStop(int64_t rid) {
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
int i, j; int32_t i, j;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
@ -321,7 +321,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
return 0; return 0;
} }
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return 0; if (pNode == NULL) return 0;
@ -348,8 +348,8 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
pFwdRsp->version = version; pFwdRsp->version = version;
pFwdRsp->code = code; pFwdRsp->code = code;
int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int retLen = write(pPeer->peerFd, msg, msgLen); int32_t retLen = write(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) { if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
@ -377,7 +377,7 @@ void syncRecover(int64_t rid) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = (SSyncPeer *)pNode->peerInfo[i]; pPeer = (SSyncPeer *)pNode->peerInfo[i];
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
@ -389,12 +389,12 @@ void syncRecover(int64_t rid) {
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
} }
int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
if (pNode == NULL) return -1; if (pNode == NULL) return -1;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
@ -416,7 +416,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
SNodeInfo nodeInfo; SNodeInfo nodeInfo;
nodeInfo.nodeId = 0; nodeInfo.nodeId = 0;
int ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); int32_t ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort);
if (-1 == ret) { if (-1 == ret) {
nodeInfo.nodePort = tsArbitratorPort; nodeInfo.nodePort = tsArbitratorPort;
} }
@ -444,7 +444,7 @@ static void syncFreeNode(void *param) {
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); }
int syncDecPeerRef(SSyncPeer *pPeer) { int32_t syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
@ -495,12 +495,12 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->refCount = 1; pPeer->refCount = 1;
sInfo("%s, it is configured", pPeer->id); sInfo("%s, it is configured", pPeer->id);
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100; int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs;
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
taosAcquireRef(tsSyncRefId, pNode->rid); taosAcquireRef(tsSyncRefId, pNode->rid);
@ -510,7 +510,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
void syncBroadcastStatus(SSyncNode *pNode) { void syncBroadcastStatus(SSyncNode *pNode) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue; if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1);
@ -518,7 +518,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
} }
static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pNode->peerInfo[i]->numOfRetrieves = 0; pNode->peerInfo[i]->numOfRetrieves = 0;
} }
@ -529,13 +529,13 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
static void syncChooseMaster(SSyncNode *pNode) { static void syncChooseMaster(SSyncNode *pNode) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
int onlineNum = 0; int32_t onlineNum = 0;
int index = -1; int32_t index = -1;
int replica = pNode->replica; int32_t replica = pNode->replica;
sDebug("vgId:%d, choose master", pNode->vgId); sDebug("vgId:%d, choose master", pNode->vgId);
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
} }
@ -544,7 +544,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
if (onlineNum == pNode->replica) { if (onlineNum == pNode->replica) {
// if all peers are online, peer with highest version shall be master // if all peers are online, peer with highest version shall be master
index = 0; index = 0;
for (int i = 1; i < pNode->replica; ++i) { for (int32_t i = 1; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) { if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) {
index = i; index = i;
} }
@ -560,7 +560,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
if (index < 0 && onlineNum > replica / 2.0) { if (index < 0 && onlineNum > replica / 2.0) {
// over half of nodes are online // over half of nodes are online
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
// slave with highest version shall be master // slave with highest version shall be master
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
@ -587,11 +587,11 @@ static void syncChooseMaster(SSyncNode *pNode) {
} }
static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int onlineNum = 0; int32_t onlineNum = 0;
int index = -1; int32_t index = -1;
int replica = pNode->replica; int32_t replica = pNode->replica;
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
} }
@ -612,7 +612,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[i];
if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
if (index < 0) { if (index < 0) {
@ -631,9 +631,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
return pMaster; return pMaster;
} }
static int syncValidateMaster(SSyncPeer *pPeer) { static int32_t syncValidateMaster(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = 0; int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
@ -641,7 +641,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1; code = -1;
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue; if (i == pNode->selfIndex) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[i]);
} }
@ -683,7 +683,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
} }
} else { } else {
// master not there, if all peer's state and version are consistent, choose the master // master not there, if all peer's state and version are consistent, choose the master
int consistent = 0; int32_t consistent = 0;
if (peersStatus) { if (peersStatus) {
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[i];
@ -721,9 +721,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -757,7 +757,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
pthread_t thread; pthread_t thread;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
@ -802,7 +802,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
// Ensure the sync of mnode not interrupted // Ensure the sync of mnode not interrupted
if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
@ -815,7 +815,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
@ -836,7 +836,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first // find the forwardInfo from first
for (int i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
if (pFwdRsp->version == pFwdInfo->version) break; if (pFwdRsp->version == pFwdInfo->version) break;
} }
@ -879,10 +879,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
} }
} }
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
if (pPeer->peerFd < 0) return -1; if (pPeer->peerFd < 0) return -1;
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen);
return -1; return -1;
@ -894,7 +894,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return -1; return -1;
} }
int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1; return -1;
@ -903,7 +903,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return 0; return 0;
} }
static int syncProcessPeerMsg(void *param, void *buffer) { static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead head;
char * cont = buffer; char * cont = buffer;
@ -911,7 +911,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
int code = syncReadPeerMsg(pPeer, &head, cont); int32_t code = syncReadPeerMsg(pPeer, &head, cont);
if (code == 0) { if (code == 0) {
if (head.type == TAOS_SMSG_FORWARD) { if (head.type == TAOS_SMSG_FORWARD) {
@ -948,12 +948,12 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
pPeersStatus->role = nodeRole; pPeersStatus->role = nodeRole;
pPeersStatus->ack = ack; pPeersStatus->ack = ack;
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
} }
int retLen = write(pPeer->peerFd, msg, statusMsgLen); int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) { if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack); pPeersStatus->version, pPeersStatus->ack);
@ -975,10 +975,10 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return; return;
} }
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
@ -999,7 +999,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} else { } else {
sDebug("try later"); sDebug("try later");
close(connFd); close(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -1024,7 +1024,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
int ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer); int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret < 0) { if (ret < 0) {
@ -1036,9 +1036,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
} }
} }
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
int i; int32_t i;
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
@ -1051,7 +1051,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
} }
int32_t vgId = firstPkt.syncHead.vgId; int32_t vgId = firstPkt.syncHead.vgId;
SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t)); SSyncNode **ppNode = (SSyncNode **)taosHashGet(tsVgIdHash, (const char *)&vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId); sError("vgId:%d, vgId could not be found", vgId);
taosCloseSocket(connFd); taosCloseSocket(connFd);
@ -1137,8 +1137,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds; int32_t fwds = pSyncFwds->fwds;
for (int i = 0; i < fwds; ++i) { for (int32_t i = 0; i < fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break; if (pFwdInfo->confirmed == 0) break;
@ -1152,7 +1152,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
} }
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) { static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) {
int confirm = 0; int32_t confirm = 0;
if (pFwdInfo->code == 0) pFwdInfo->code = code; if (pFwdInfo->code == 0) pFwdInfo->code = code;
if (code == 0) { if (code == 0) {
@ -1186,7 +1186,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break; if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
@ -1196,23 +1196,23 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl);
} }
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead * pWalHead = data; SWalHead * pWalHead = data;
int fwdLen; int32_t fwdLen;
int32_t code = 0; int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion); pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
@ -1238,7 +1238,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd < 0) continue; if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
@ -1248,7 +1248,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
code = 1; code = 1;
} }
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) { if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else { } else {

View File

@ -48,12 +48,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
} }
} }
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
SFileAck fileAck; SFileAck fileAck;
int code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore uint32_t pindex = 0; // index in last restore
bool fileChanged = false; bool fileChanged = false;
@ -62,7 +62,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
sinfo.index = 0; sinfo.index = 0;
while (1) { while (1) {
// read file info // read file info
int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
if (ret < 0) break; if (ret < 0) break;
// if no more file from master, break; // if no more file from master, break;
@ -104,7 +104,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
minfo.name[sizeof(minfo.name) - 1] = 0; minfo.name[sizeof(minfo.name) - 1] = 0;
snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name); snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);
int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (dfd < 0) { if (dfd < 0) {
sError("%s, failed to open file:%s", pPeer->id, name); sError("%s, failed to open file:%s", pPeer->id, name);
break; break;
@ -132,9 +132,9 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
return code; return code;
} }
static int syncRestoreWal(SSyncPeer *pPeer) { static int32_t syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1; int32_t ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record void *buffer = calloc(1024000, 1); // size for one record
if (buffer == NULL) return -1; if (buffer == NULL) return -1;
@ -175,10 +175,10 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
return offset; return offset;
} }
static int syncProcessBufferedFwd(SSyncPeer *pPeer) { static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0; int32_t forwards = 0;
sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards); sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards);
@ -203,12 +203,12 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
return pRecv->code; return pRecv->code;
} }
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
if (pRecv == NULL) return -1; if (pRecv == NULL) return -1;
int len = pHead->len + sizeof(SWalHead); int32_t len = pHead->len + sizeof(SWalHead);
if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) { if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
memcpy(pRecv->offset, pHead, len); memcpy(pRecv->offset, pHead, len);
@ -231,7 +231,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) {
tfree(pNode->pRecv); tfree(pNode->pRecv);
} }
static int syncOpenRecvBuffer(SSyncNode *pNode) { static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
@ -252,13 +252,13 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) {
return 0; return 0;
} }
static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
sDebug("%s, start to restore file", pPeer->id); sDebug("%s, start to restore file", pPeer->id);
int code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
if (code < 0) { if (code < 0) {
sError("%s, failed to restore file", pPeer->id); sError("%s, failed to restore file", pPeer->id);
return -1; return -1;

View File

@ -27,7 +27,7 @@
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
sDebug("%s, start to monitor:%s", pPeer->id, name); sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <= 0) { if (pPeer->notifyFd <= 0) {
@ -38,16 +38,16 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
return -1; return -1;
} }
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) { if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id); sError("%s, failed to allocate watchFd", pPeer->id);
return -1; return -1;
} }
memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
} }
int *wd = pPeer->watchFd + pPeer->watchNum; int32_t *wd = pPeer->watchFd + pPeer->watchNum;
if (*wd >= 0) { if (*wd >= 0) {
if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) {
@ -69,17 +69,17 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
return 0; return 0;
} }
static int syncAreFilesModified(SSyncPeer *pPeer) { static int32_t syncAreFilesModified(SSyncPeer *pPeer) {
if (pPeer->notifyFd <= 0) return 0; if (pPeer->notifyFd <= 0) return 0;
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
} }
int code = 0; int32_t code = 0;
if (len > 0) { if (len > 0) {
const struct inotify_event *event; const struct inotify_event *event;
char *ptr; char *ptr;
@ -97,11 +97,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) {
return code; return code;
} }
static int syncRetrieveFile(SSyncPeer *pPeer) { static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; SFileInfo fileInfo;
SFileAck fileAck; SFileAck fileAck;
int code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
memset(&fileInfo, 0, sizeof(fileInfo)); memset(&fileInfo, 0, sizeof(fileInfo));
@ -146,7 +146,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
} }
// send the file to peer // send the file to peer
int sfd = open(name, O_RDONLY); int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
@ -169,8 +169,8 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
/* if only a partial record is read out, set the IN_MODIFY flag in event, /* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */ so upper layer will reload the file to get a complete record */
static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) {
int ret; int32_t ret;
ret = read(sfd, pHead, sizeof(SWalHead)); ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) return -1; if (ret < 0) return -1;
@ -194,7 +194,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
pPeer->watchNum = 0; pPeer->watchNum = 0;
taosClose(pPeer->notifyFd); taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
@ -203,14 +203,14 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
return -1; return -1;
} }
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles); if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) { if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id); sError("%s, failed to allocate watchFd", pPeer->id);
return -1; return -1;
} }
memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles); memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
int *wd = pPeer->watchFd; int32_t *wd = pPeer->watchFd;
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) { if (*wd == -1) {
@ -222,8 +222,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
} }
static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
@ -243,11 +243,11 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
return 0; return 0;
} }
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = malloc(640000); SWalHead *pHead = malloc(640000);
int code = -1; int32_t code = -1;
int32_t bytes = 0; int32_t bytes = 0;
int sfd; int32_t sfd;
sfd = open(name, O_RDONLY); sfd = open(name, O_RDONLY);
if (sfd < 0) { if (sfd < 0) {
@ -259,7 +259,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize < 0) break; if (wsize < 0) break;
if (wsize == 0) { if (wsize == 0) {
code = 0; code = 0;
@ -267,7 +267,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
} }
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) break; if (ret != wsize) break;
pPeer->sversion = pHead->version; pPeer->sversion = pHead->version;
@ -287,9 +287,9 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int32_t code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
if (syncAreFilesModified(pPeer) != 0) return -1; if (syncAreFilesModified(pPeer) != 0) return -1;
@ -370,13 +370,13 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
return code; return code;
} }
static int syncRetrieveWal(SSyncPeer *pPeer) { static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
struct stat fstat; struct stat fstat;
int code = -1; int32_t code = -1;
int64_t index = 0; int64_t index = 0;
while (1) { while (1) {
@ -403,7 +403,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
size = fstat.st_size; size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
int sfd = open(fname, O_RDONLY); int32_t sfd = open(fname, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
code = taosSendFile(pPeer->syncFd, sfd, NULL, size); code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
@ -428,7 +428,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
return code; return code;
} }
static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SFirstPkt firstPkt;

View File

@ -28,22 +28,22 @@
#include "syncInt.h" #include "syncInt.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(void *param);
static int arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(void *param, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static ttpool_h tsArbTcpPool; static ttpool_h tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
int nodeFd; int32_t nodeFd;
void *pConn; void * pConn;
} SNodeConn; } SNodeConn;
int main(int argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
for (int i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
tsArbitratorPort = atoi(argv[++i]); tsArbitratorPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
@ -108,7 +108,7 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) { static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
@ -150,13 +150,13 @@ static void arbProcessBrokenLink(void *param) {
tfree(pNode); tfree(pNode);
} }
static int arbProcessPeerMsg(void *param, void *buffer) { static int32_t arbProcessPeerMsg(void *param, void *buffer) {
SNodeConn *pNode = param; SNodeConn *pNode = param;
SSyncHead head; SSyncHead head;
int bytes = 0; int32_t bytes = 0;
char * cont = (char *)buffer; char * cont = (char *)buffer;
int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
if (hlen != sizeof(head)) { if (hlen != sizeof(head)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1; return -1;