commit
8e525a3702
|
@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
|||
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
||||
} else {
|
||||
if (qtype == TAOS_QTYPE_FWD) {
|
||||
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0);
|
||||
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
}
|
||||
if (pWrite->rspRet.rsp) {
|
||||
rpcFreeCont(pWrite->rspRet.rsp);
|
||||
|
|
|
@ -79,6 +79,9 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
|
|||
// get file version
|
||||
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
|
||||
|
||||
// reset version
|
||||
typedef int32_t (*FResetVersion)(int32_t vgId, uint64_t fver);
|
||||
|
||||
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
|
||||
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
|
||||
|
||||
|
@ -96,6 +99,7 @@ typedef struct {
|
|||
FStartSyncFile startSyncFileFp;
|
||||
FStopSyncFile stopSyncFileFp;
|
||||
FGetVersion getVersionFp;
|
||||
FResetVersion resetVersionFp;
|
||||
FSendFile sendFileFp;
|
||||
FRecvFile recvFileFp;
|
||||
} SSyncInfo;
|
||||
|
@ -108,8 +112,8 @@ void syncCleanUp();
|
|||
int64_t syncStart(const SSyncInfo *);
|
||||
void syncStop(int64_t rid);
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg *);
|
||||
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype);
|
||||
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code);
|
||||
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force);
|
||||
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force);
|
||||
void syncRecover(int64_t rid); // recover from other nodes:
|
||||
int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ void walFsync(twalh, bool forceFsync);
|
|||
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
||||
uint64_t walGetVersion(twalh);
|
||||
void walResetVersion(twalh, uint64_t newVer);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
|
|||
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
|
||||
|
||||
// vnodeSync
|
||||
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
|
||||
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
|
||||
|
||||
// vnodeRead
|
||||
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
|
||||
|
|
|
@ -36,6 +36,7 @@ ELSEIF (TD_DARWIN)
|
|||
LIST(APPEND SRC ./src/shellDarwin.c)
|
||||
LIST(APPEND SRC ./src/shellCommand.c)
|
||||
LIST(APPEND SRC ./src/shellImport.c)
|
||||
LIST(APPEND SRC ./src/shellCheck.c)
|
||||
ADD_EXECUTABLE(shell ${SRC})
|
||||
# linking with dylib
|
||||
TARGET_LINK_LIBRARIES(shell taos)
|
||||
|
|
|
@ -51,6 +51,7 @@ typedef struct SShellArguments {
|
|||
char file[TSDB_FILENAME_LEN];
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
int threadNum;
|
||||
int check;
|
||||
char* commands;
|
||||
int abort;
|
||||
int port;
|
||||
|
@ -71,6 +72,7 @@ void read_history();
|
|||
void write_history();
|
||||
void source_file(TAOS* con, char* fptr);
|
||||
void source_dir(TAOS* con, SShellArguments* args);
|
||||
void shellCheck(TAOS* con, SShellArguments* args);
|
||||
void get_history_path(char* history);
|
||||
void cleanup_handler(void* arg);
|
||||
void exitShell();
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _GNU_SOURCE
|
||||
#define _XOPEN_SOURCE
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
#include "shell.h"
|
||||
#include "shellCommand.h"
|
||||
#include "tglobal.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#define SHELL_SQL_LEN 1024
|
||||
static int32_t tbNum = 0;
|
||||
static int32_t tbMallocNum = 0;
|
||||
static char ** tbNames = NULL;
|
||||
static int32_t checkedNum = 0;
|
||||
static int32_t errorNum = 0;
|
||||
|
||||
typedef struct {
|
||||
pthread_t threadID;
|
||||
int threadIndex;
|
||||
int totalThreads;
|
||||
void * taos;
|
||||
char * db;
|
||||
} ShellThreadObj;
|
||||
|
||||
static int32_t shellUseDb(TAOS *con, char *db) {
|
||||
if (db == NULL) {
|
||||
fprintf(stdout, "no dbname input\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
char sql[SHELL_SQL_LEN] = {0};
|
||||
snprintf(sql, SHELL_SQL_LEN, "use %s", db);
|
||||
|
||||
TAOS_RES *pSql = taos_query(con, sql);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
|
||||
}
|
||||
|
||||
taos_free_result(pSql);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t shellShowTables(TAOS *con, char *db) {
|
||||
char sql[SHELL_SQL_LEN] = {0};
|
||||
snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);
|
||||
|
||||
TAOS_RES *pSql = taos_query(con, sql);
|
||||
int32_t code = taos_errno(pSql);
|
||||
|
||||
if (code != 0) {
|
||||
fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
|
||||
} else {
|
||||
TAOS_ROW row;
|
||||
while ((row = taos_fetch_row(pSql))) {
|
||||
int32_t tbIndex = tbNum++;
|
||||
if (tbMallocNum < tbNum) {
|
||||
tbMallocNum = (tbMallocNum * 2 + 1);
|
||||
tbNames = realloc(tbNames, tbMallocNum * sizeof(char *));
|
||||
if (tbNames == NULL) {
|
||||
fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
|
||||
strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
|
||||
if (tbIndex % 100000 == 0 && tbIndex != 0) {
|
||||
fprintf(stdout, "%d tablenames fetched\n", tbIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taos_free_result(pSql);
|
||||
|
||||
fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void shellFreeTbnames() {
|
||||
for (int32_t i = 0; i < tbNum; ++i) {
|
||||
free(tbNames[i]);
|
||||
}
|
||||
free(tbNames);
|
||||
}
|
||||
|
||||
static void *shellCheckThreadFp(void *arg) {
|
||||
ShellThreadObj *pThread = (ShellThreadObj *)arg;
|
||||
|
||||
int32_t interval = tbNum / pThread->totalThreads + 1;
|
||||
int32_t start = pThread->threadIndex * interval;
|
||||
int32_t end = (pThread->threadIndex + 1) * interval;
|
||||
|
||||
if (end > tbNum) end = tbNum + 1;
|
||||
|
||||
char file[32] = {0};
|
||||
snprintf(file, 32, "tb%d.txt", pThread->threadIndex);
|
||||
|
||||
FILE *fp = fopen(file, "w");
|
||||
if (!fp) {
|
||||
fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char sql[SHELL_SQL_LEN];
|
||||
for (int32_t t = start; t < end; ++t) {
|
||||
char *tbname = tbNames[t];
|
||||
if (tbname == NULL) break;
|
||||
|
||||
snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);
|
||||
|
||||
TAOS_RES *pSql = taos_query(pThread->taos, sql);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
|
||||
fwrite(sql, 1, len, fp);
|
||||
atomic_add_fetch_32(&errorNum, 1);
|
||||
}
|
||||
|
||||
int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
|
||||
if (cnum % 5000 == 0 && cnum != 0) {
|
||||
fprintf(stdout, "%d tables checked\n", cnum);
|
||||
}
|
||||
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
fsync(fileno(fp));
|
||||
fclose(fp);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void shellRunCheckThreads(TAOS *con, SShellArguments *args) {
|
||||
pthread_attr_t thattr;
|
||||
ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj));
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
ShellThreadObj *pThread = threadObj + t;
|
||||
pThread->threadIndex = t;
|
||||
pThread->totalThreads = args->threadNum;
|
||||
pThread->taos = con;
|
||||
pThread->db = args->database;
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
|
||||
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
pthread_join(threadObj[t].threadID, NULL);
|
||||
}
|
||||
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
taos_close(threadObj[t].taos);
|
||||
}
|
||||
free(threadObj);
|
||||
}
|
||||
|
||||
void shellCheck(TAOS *con, SShellArguments *args) {
|
||||
int64_t start = taosGetTimestampMs();
|
||||
|
||||
if (shellUseDb(con, args->database) != 0) {
|
||||
shellFreeTbnames();
|
||||
return;
|
||||
}
|
||||
|
||||
if (shellShowTables(con, args->database) != 0) {
|
||||
shellFreeTbnames();
|
||||
return;
|
||||
}
|
||||
|
||||
fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, args->threadNum);
|
||||
shellRunCheckThreads(con, args);
|
||||
|
||||
int64_t end = taosGetTimestampMs();
|
||||
fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
|
||||
(end - start) / 1000.0);
|
||||
}
|
|
@ -121,6 +121,12 @@ TAOS *shellInit(SShellArguments *args) {
|
|||
taos_close(con);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
if (args->check != 0) {
|
||||
shellCheck(con, args);
|
||||
taos_close(con);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
#endif
|
||||
|
||||
return con;
|
||||
|
|
|
@ -45,6 +45,7 @@ static struct argp_option options[] = {
|
|||
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
||||
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||
{"check", 'k', "CHECK", 0, "Check tables."},
|
||||
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
||||
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
||||
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."},
|
||||
|
@ -130,6 +131,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
|||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'k':
|
||||
arguments->check = atoi(arg);
|
||||
break;
|
||||
case 'd':
|
||||
arguments->database = arg;
|
||||
break;
|
||||
|
|
|
@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
|
|||
if (pRow != NULL) {
|
||||
// forward to peers
|
||||
pRow->processedCount = 0;
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
|
||||
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
|
||||
if (syncCode <= 0) pRow->processedCount = 1;
|
||||
|
||||
if (syncCode < 0) {
|
||||
|
@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
|
|||
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
|
||||
|
||||
// even it is WAL/FWD, it shall be called to update version in sync
|
||||
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC);
|
||||
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
|
||||
|
||||
// from wal or forward msg, row not created, should add into hash
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
|
@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) {
|
|||
sdbConfirmForward(1, pRow, pRow->code);
|
||||
} else {
|
||||
if (qtype == TAOS_QTYPE_FWD) {
|
||||
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code);
|
||||
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code, false);
|
||||
}
|
||||
sdbFreeFromQueue(pRow);
|
||||
}
|
||||
|
|
|
@ -117,6 +117,7 @@ typedef struct SSyncNode {
|
|||
FStartSyncFile startSyncFileFp;
|
||||
FStopSyncFile stopSyncFileFp;
|
||||
FGetVersion getVersionFp;
|
||||
FResetVersion resetVersionFp;
|
||||
FSendFile sendFileFp;
|
||||
FRecvFile recvFileFp;
|
||||
pthread_mutex_t mutex;
|
||||
|
|
|
@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId);
|
|||
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
|
||||
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
|
||||
static void syncRestartPeer(SSyncPeer *pPeer);
|
||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
|
||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force);
|
||||
|
||||
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
|
||||
static void syncStartCheckPeerConn(SSyncPeer *pPeer);
|
||||
|
@ -182,6 +182,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
|
|||
pNode->startSyncFileFp = pInfo->startSyncFileFp;
|
||||
pNode->stopSyncFileFp = pInfo->stopSyncFileFp;
|
||||
pNode->getVersionFp = pInfo->getVersionFp;
|
||||
pNode->resetVersionFp = pInfo->resetVersionFp;
|
||||
pNode->sendFileFp = pInfo->sendFileFp;
|
||||
pNode->recvFileFp = pInfo->recvFileFp;
|
||||
|
||||
|
@ -377,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) {
|
||||
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype, bool force) {
|
||||
if (rid <= 0) return 0;
|
||||
|
||||
SSyncNode *pNode = syncAcquireNode(rid);
|
||||
if (pNode == NULL) return 0;
|
||||
|
||||
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
|
||||
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype, force);
|
||||
|
||||
syncReleaseNode(pNode);
|
||||
return code;
|
||||
}
|
||||
|
||||
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
|
||||
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) {
|
||||
SSyncNode *pNode = syncAcquireNode(rid);
|
||||
if (pNode == NULL) return;
|
||||
|
||||
SSyncPeer *pPeer = pNode->pMaster;
|
||||
if (pPeer && pNode->quorum > 1) {
|
||||
if (pPeer && (pNode->quorum > 1 || force)) {
|
||||
SFwdRsp rsp;
|
||||
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
|
||||
|
||||
|
@ -1413,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
|||
syncReleaseNode(pNode);
|
||||
}
|
||||
|
||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
|
||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force) {
|
||||
SSyncPeer *pPeer;
|
||||
SSyncHead *pSyncHead;
|
||||
SWalHead * pWalHead = data;
|
||||
|
@ -1457,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
|||
if (pPeer == NULL || pPeer->peerFd < 0) continue;
|
||||
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
|
||||
|
||||
if (pNode->quorum > 1 && code == 0) {
|
||||
if ((pNode->quorum > 1 || force) && code == 0) {
|
||||
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
|
||||
if (code >= 0) code = 1;
|
||||
}
|
||||
|
|
|
@ -238,6 +238,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
|||
|
||||
(*pNode->stopSyncFileFp)(pNode->vgId, fversion);
|
||||
nodeVersion = fversion;
|
||||
if (pNode->resetVersionFp) (*pNode->resetVersionFp)(pNode->vgId, fversion);
|
||||
|
||||
sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion);
|
||||
uint64_t wver = 0;
|
||||
|
|
|
@ -30,8 +30,9 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion);
|
|||
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
||||
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
||||
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
|
||||
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver);
|
||||
|
||||
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
|
||||
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -305,6 +305,7 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
syncInfo.startSyncFileFp = vnodeStartSyncFile;
|
||||
syncInfo.stopSyncFileFp = vnodeStopSyncFile;
|
||||
syncInfo.getVersionFp = vnodeGetVersion;
|
||||
syncInfo.resetVersionFp = vnodeResetVersion;
|
||||
syncInfo.sendFileFp = tsdbSyncSend;
|
||||
syncInfo.recvFileFp = tsdbSyncRecv;
|
||||
syncInfo.pTsdb = pVnode->tsdb;
|
||||
|
|
|
@ -158,7 +158,23 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
syncConfirmForward(pVnode->sync, version, code);
|
||||
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
vError("vgId:%d, vnode not found while reset version", vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pVnode->fversion = fver;
|
||||
pVnode->version = fver;
|
||||
walResetVersion(pVnode->wal, fver);
|
||||
vDebug("vgId:%d, version reset to %" PRIu64, vgId, fver);
|
||||
|
||||
vnodeRelease(pVnode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
syncConfirmForward(pVnode->sync, version, code, force);
|
||||
}
|
|
@ -89,7 +89,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
|||
|
||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
||||
int32_t syncCode = 0;
|
||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype);
|
||||
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
|
||||
if (syncCode < 0) return syncCode;
|
||||
|
||||
// write into WAL
|
||||
|
|
|
@ -446,3 +446,16 @@ uint64_t walGetVersion(twalh param) {
|
|||
|
||||
return pWal->version;
|
||||
}
|
||||
|
||||
// Wal version in slave (dnode1) must be reset.
|
||||
// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin.
|
||||
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
|
||||
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
|
||||
|
||||
void walResetVersion(twalh param, uint64_t newVer) {
|
||||
SWal *pWal = param;
|
||||
if (pWal == 0) return;
|
||||
wDebug("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
|
||||
|
||||
pWal->version = newVer;
|
||||
}
|
|
@ -0,0 +1,556 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c role -v 1
|
||||
system sh/cfg.sh -n dnode2 -c role -v 2
|
||||
system sh/cfg.sh -n dnode3 -c role -v 2
|
||||
system sh/cfg.sh -n dnode4 -c role -v 2
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
|
||||
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
|
||||
|
||||
print ============== step0: start tarbitrator
|
||||
system sh/exec_tarbitrator.sh -s start
|
||||
|
||||
print ============== step1: start dnode1, only deploy mnode
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
print ============== step2: start dnode2/dnode3
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
sql create dnode $hostname2
|
||||
sql create dnode $hostname3
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data4_3 != ready then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
sleep 1000
|
||||
|
||||
print ============== step3
|
||||
sql create database db replica 2
|
||||
sql use db
|
||||
|
||||
sql create table stb (ts timestamp, c1 int, c2 int) tags(t1 int)
|
||||
sql create table t1 using stb tags(1)
|
||||
sql insert into t1 values(1577980800000, 1, 5)
|
||||
sql insert into t1 values(1577980800001, 2, 4)
|
||||
sql insert into t1 values(1577980800002, 3, 3)
|
||||
sql insert into t1 values(1577980800003, 4, 2)
|
||||
sql insert into t1 values(1577980800004, 5, 1)
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data05 != master then
|
||||
return -1
|
||||
endi
|
||||
if $data07 != slave then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
|
||||
|
||||
print ============== step4
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$x = 0
|
||||
step4:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step4
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step4
|
||||
endi
|
||||
if $data4_3 != ready then
|
||||
goto step4
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
goto step4
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step4
|
||||
endi
|
||||
if $data05 != master then
|
||||
goto step4
|
||||
endi
|
||||
if $data07 != slave then
|
||||
goto step4
|
||||
endi
|
||||
|
||||
sql create table t2 using stb tags(1)
|
||||
sql insert into t2 values(1577980800000, 1, 5)
|
||||
sql insert into t2 values(1577980800001, 2, 4)
|
||||
sql insert into t2 values(1577980800002, 3, 3)
|
||||
sql insert into t2 values(1577980800003, 4, 2)
|
||||
sql insert into t2 values(1577980800004, 5, 1)
|
||||
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step5
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
|
||||
|
||||
$x = 0
|
||||
step5:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step5
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step5
|
||||
endi
|
||||
if $data4_3 != offline then
|
||||
goto step5
|
||||
endi
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
goto step5
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step5
|
||||
endi
|
||||
if $data05 != offline then
|
||||
goto step5
|
||||
endi
|
||||
if $data07 != master then
|
||||
goto step5
|
||||
endi
|
||||
|
||||
print ============== step6
|
||||
sql create table t3 using stb tags(1)
|
||||
sql insert into t3 values(1577980800000, 1, 5)
|
||||
sql insert into t3 values(1577980800001, 2, 4)
|
||||
sql insert into t3 values(1577980800002, 3, 3)
|
||||
sql insert into t3 values(1577980800003, 4, 2)
|
||||
sql insert into t3 values(1577980800004, 5, 1)
|
||||
sql insert into t3 values(1577980800010, 11, 5)
|
||||
sql insert into t3 values(1577980800011, 12, 4)
|
||||
sql insert into t3 values(1577980800012, 13, 3)
|
||||
sql insert into t3 values(1577980800013, 14, 2)
|
||||
sql insert into t3 values(1577980800014, 15, 1)
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$x = 0
|
||||
step6:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step6
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step6
|
||||
endi
|
||||
if $data4_3 != ready then
|
||||
goto step6
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
goto step6
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step6
|
||||
endi
|
||||
if $data05 != slave then
|
||||
goto step6
|
||||
endi
|
||||
if $data07 != master then
|
||||
goto step6
|
||||
endi
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step7
|
||||
sql create table t4 using stb tags(1)
|
||||
sql insert into t4 values(1577980800000, 1, 5)
|
||||
sql insert into t4 values(1577980800001, 2, 4)
|
||||
sql insert into t4 values(1577980800002, 3, 3)
|
||||
sql insert into t4 values(1577980800003, 4, 2)
|
||||
sql insert into t4 values(1577980800004, 5, 1)
|
||||
sql insert into t4 values(1577980800010, 11, 5)
|
||||
sql insert into t4 values(1577980800011, 12, 4)
|
||||
sql insert into t4 values(1577980800012, 13, 3)
|
||||
sql insert into t4 values(1577980800013, 14, 2)
|
||||
sql insert into t4 values(1577980800014, 15, 1)
|
||||
sql insert into t4 values(1577980800020, 21, 5)
|
||||
sql insert into t4 values(1577980800021, 22, 4)
|
||||
sql insert into t4 values(1577980800022, 23, 3)
|
||||
sql insert into t4 values(1577980800023, 24, 2)
|
||||
sql insert into t4 values(1577980800024, 25, 1)
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
||||
$x = 0
|
||||
step7:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step7
|
||||
endi
|
||||
if $data4_2 != offline then
|
||||
goto step7
|
||||
endi
|
||||
if $data4_3 != ready then
|
||||
goto step7
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
goto step7
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step7
|
||||
endi
|
||||
if $data05 != master then
|
||||
goto step7
|
||||
endi
|
||||
if $data07 != offline then
|
||||
goto step7
|
||||
endi
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step8
|
||||
sql create table t5 using stb tags(1)
|
||||
sql insert into t5 values(1577980800000, 1, 5)
|
||||
sql insert into t5 values(1577980800001, 2, 4)
|
||||
sql insert into t5 values(1577980800002, 3, 3)
|
||||
sql insert into t5 values(1577980800003, 4, 2)
|
||||
sql insert into t5 values(1577980800004, 5, 1)
|
||||
sql insert into t5 values(1577980800010, 11, 5)
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t5
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
$x = 0
|
||||
step8:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step8
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step8
|
||||
endi
|
||||
if $data4_3 != ready then
|
||||
goto step8
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
if $data04 != 3 then
|
||||
goto step8
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step8
|
||||
endi
|
||||
if $data05 != master then
|
||||
goto step8
|
||||
endi
|
||||
if $data07 != slave then
|
||||
goto step8
|
||||
endi
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t5
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ============== step9
|
||||
sql create table t6 using stb tags(1)
|
||||
sql insert into t6 values(1577980800000, 1, 5)
|
||||
sql insert into t6 values(1577980800001, 2, 4)
|
||||
sql insert into t6 values(1577980800002, 3, 3)
|
||||
sql insert into t6 values(1577980800003, 4, 2)
|
||||
sql insert into t6 values(1577980800004, 5, 1)
|
||||
sql insert into t6 values(1577980800010, 11, 5)
|
||||
sql insert into t6 values(1577980800011, 12, 4)
|
||||
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t5
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t6
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGKILL
|
||||
$x = 0
|
||||
step9:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 $data4_1
|
||||
print dnode2 $data4_2
|
||||
print dnode3 $data4_3
|
||||
|
||||
if $data4_1 != ready then
|
||||
goto step9
|
||||
endi
|
||||
if $data4_2 != ready then
|
||||
goto step9
|
||||
endi
|
||||
if $data4_3 != offline then
|
||||
goto step9
|
||||
endi
|
||||
|
||||
print ============== 2
|
||||
sql show db.vgroups
|
||||
|
||||
if $data04 != 3 then
|
||||
goto step7
|
||||
endi
|
||||
if $data06 != 2 then
|
||||
goto step7
|
||||
endi
|
||||
if $data05 != offline then
|
||||
goto step7
|
||||
endi
|
||||
if $data07 != master then
|
||||
goto step7
|
||||
endi
|
||||
|
||||
print ============== 3
|
||||
sql select * from t1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t2
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t3
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t4
|
||||
if $rows != 15 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t5
|
||||
if $rows != 6 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from t6
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop
|
||||
system sh/exec.sh -n dnode2 -s stop
|
||||
system sh/exec.sh -n dnode3 -s stop
|
Loading…
Reference in New Issue