TD-2444 TD-2224
This commit is contained in:
parent
3cfad2ed2f
commit
25bde6dad9
|
@ -266,6 +266,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||||
|
|
|
@ -121,7 +121,6 @@ extern char *syncRole[];
|
||||||
//global configurable parameters
|
//global configurable parameters
|
||||||
extern int32_t tsMaxSyncNum;
|
extern int32_t tsMaxSyncNum;
|
||||||
extern int32_t tsSyncTcpThreads;
|
extern int32_t tsSyncTcpThreads;
|
||||||
extern int32_t tsMaxWatchFiles;
|
|
||||||
extern int32_t tsSyncTimer;
|
extern int32_t tsSyncTimer;
|
||||||
extern int32_t tsMaxFwdInfo;
|
extern int32_t tsMaxFwdInfo;
|
||||||
extern int32_t sDebugFlag;
|
extern int32_t sDebugFlag;
|
||||||
|
|
|
@ -32,8 +32,7 @@
|
||||||
// global configurable
|
// global configurable
|
||||||
int32_t tsMaxSyncNum = 2;
|
int32_t tsMaxSyncNum = 2;
|
||||||
int32_t tsSyncTcpThreads = 2;
|
int32_t tsSyncTcpThreads = 2;
|
||||||
int32_t tsMaxWatchFiles = 500;
|
int32_t tsMaxFwdInfo = 512;
|
||||||
int32_t tsMaxFwdInfo = 200;
|
|
||||||
int32_t tsSyncTimer = 1;
|
int32_t tsSyncTimer = 1;
|
||||||
|
|
||||||
// module global, not configurable
|
// module global, not configurable
|
||||||
|
@ -60,7 +59,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
|
||||||
static void syncMonitorFwdInfos(void *param, void *tmrId);
|
static void syncMonitorFwdInfos(void *param, void *tmrId);
|
||||||
static void syncMonitorNodeRole(void *param, void *tmrId);
|
static void syncMonitorNodeRole(void *param, void *tmrId);
|
||||||
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
|
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
|
||||||
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
|
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
|
||||||
static void syncRestartPeer(SSyncPeer *pPeer);
|
static void syncRestartPeer(SSyncPeer *pPeer);
|
||||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
|
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
|
||||||
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
|
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
|
||||||
|
@ -892,15 +891,24 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
||||||
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
|
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
|
||||||
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
||||||
|
|
||||||
|
bool found = false;
|
||||||
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
||||||
// find the forwardInfo from first
|
// find the forwardInfo from first
|
||||||
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
|
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
|
||||||
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
|
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
|
||||||
if (pFwdRsp->version == pFwdInfo->version) break;
|
if (pFwdRsp->version == pFwdInfo->version) {
|
||||||
|
found = true;
|
||||||
|
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
|
||||||
|
syncRemoveConfirmedFwdInfo(pNode);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found) {
|
||||||
|
sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first,
|
||||||
|
pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version);
|
||||||
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
|
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
|
||||||
syncRemoveConfirmedFwdInfo(pNode);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1180,13 +1188,15 @@ static void syncProcessBrokenLink(void *param) {
|
||||||
taosReleaseRef(tsSyncRefId, pNode->rid);
|
taosReleaseRef(tsSyncRefId, pNode->rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
int64_t time = taosGetTimestampMs();
|
int64_t time = taosGetTimestampMs();
|
||||||
|
|
||||||
if (pSyncFwds->fwds >= tsMaxFwdInfo) {
|
if (pSyncFwds->fwds >= tsMaxFwdInfo) {
|
||||||
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
|
// pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
|
||||||
pSyncFwds->fwds--;
|
// pSyncFwds->fwds--;
|
||||||
|
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
|
||||||
|
return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncFwds->fwds > 0) {
|
if (pSyncFwds->fwds > 0) {
|
||||||
|
@ -1201,6 +1211,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
||||||
|
|
||||||
pSyncFwds->fwds++;
|
pSyncFwds->fwds++;
|
||||||
sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
|
sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
||||||
|
@ -1214,8 +1226,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
||||||
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
|
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
|
||||||
pSyncFwds->fwds--;
|
pSyncFwds->fwds--;
|
||||||
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
|
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
|
||||||
// sDebug("vgId:%d, fwd info is removed, hver:%d, fwds:%d",
|
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
|
||||||
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
|
|
||||||
memset(pFwdInfo, 0, sizeof(SFwdInfo));
|
memset(pFwdInfo, 0, sizeof(SFwdInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1341,8 +1352,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
|
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
|
||||||
|
|
||||||
if (pNode->quorum > 1 && code == 0) {
|
if (pNode->quorum > 1 && code == 0) {
|
||||||
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
|
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
|
||||||
code = 1;
|
if (code >= 0) code = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
||||||
|
|
|
@ -32,6 +32,7 @@ int32_t numOfThreads = 30;
|
||||||
int32_t numOfTables = 100000;
|
int32_t numOfTables = 100000;
|
||||||
int32_t replica = 1;
|
int32_t replica = 1;
|
||||||
int32_t numOfColumns = 2;
|
int32_t numOfColumns = 2;
|
||||||
|
TAOS * con = NULL;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t tableBeginIndex;
|
int32_t tableBeginIndex;
|
||||||
|
@ -84,13 +85,14 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
free(pInfo);
|
free(pInfo);
|
||||||
|
taos_close(con);
|
||||||
}
|
}
|
||||||
|
|
||||||
void createDbAndSTable() {
|
void createDbAndSTable() {
|
||||||
pPrint("start to create db and stable");
|
pPrint("start to create db and stable");
|
||||||
char qstr[64000];
|
char qstr[64000];
|
||||||
|
|
||||||
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
con = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
if (con == NULL) {
|
if (con == NULL) {
|
||||||
pError("failed to connect to DB, reason:%s", taos_errstr(con));
|
pError("failed to connect to DB, reason:%s", taos_errstr(con));
|
||||||
exit(1);
|
exit(1);
|
||||||
|
@ -127,8 +129,6 @@ void createDbAndSTable() {
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
|
|
||||||
taos_close(con);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *threadFunc(void *param) {
|
void *threadFunc(void *param) {
|
||||||
|
@ -136,12 +136,6 @@ void *threadFunc(void *param) {
|
||||||
char qstr[65000];
|
char qstr[65000];
|
||||||
int code;
|
int code;
|
||||||
|
|
||||||
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
|
||||||
if (con == NULL) {
|
|
||||||
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
sprintf(qstr, "use %s", pInfo->dbName);
|
sprintf(qstr, "use %s", pInfo->dbName);
|
||||||
TAOS_RES *pSql = taos_query(con, qstr);
|
TAOS_RES *pSql = taos_query(con, qstr);
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
|
@ -170,7 +164,6 @@ void *threadFunc(void *param) {
|
||||||
pInfo->createTableSpeed = speed;
|
pInfo->createTableSpeed = speed;
|
||||||
|
|
||||||
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
|
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
|
||||||
taos_close(con);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue