commit
6a4481f490
|
@ -200,7 +200,7 @@ int32_t tsNumOfLogLines = 10000000;
|
|||
int32_t mDebugFlag = 135;
|
||||
int32_t sdbDebugFlag = 135;
|
||||
int32_t dDebugFlag = 135;
|
||||
int32_t vDebugFlag = 131;
|
||||
int32_t vDebugFlag = 135;
|
||||
int32_t cDebugFlag = 131;
|
||||
int32_t jniDebugFlag = 131;
|
||||
int32_t odbcDebugFlag = 131;
|
||||
|
|
|
@ -261,6 +261,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync")
|
||||
|
||||
// wal
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||
|
@ -367,6 +370,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, 0, 0x11A4, "tag value
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string")
|
||||
|
||||
// odbc
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2100, "out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, 0, 0x2101, "convertion not a valid literal input")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined")
|
||||
|
@ -390,7 +394,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_BAD_SEQ, 0, 0x2113, "src bad se
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_INCOMPLETE, 0, 0x2114, "src incomplete")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_GENERAL, 0, 0x2115, "src general")
|
||||
|
||||
|
||||
#ifdef TAOS_ERROR_C
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -85,6 +85,9 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
|
|||
// when data file is synced successfully, notity app
|
||||
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
|
||||
|
||||
// get file version
|
||||
typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId; // vgroup ID
|
||||
uint64_t version; // initial version
|
||||
|
@ -97,6 +100,7 @@ typedef struct {
|
|||
FNotifyRole notifyRole;
|
||||
FNotifyFlowCtrl notifyFlowCtrl;
|
||||
FNotifyFileSynced notifyFileSynced;
|
||||
FGetFileVersion getFileVersion;
|
||||
} SSyncInfo;
|
||||
|
||||
typedef void *tsync_h;
|
||||
|
|
|
@ -139,6 +139,7 @@ typedef struct SsyncPeer {
|
|||
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
|
||||
uint64_t version;
|
||||
uint64_t sversion; // track the peer version in retrieve process
|
||||
uint64_t lastVer; // track the file version while retrieve
|
||||
int32_t syncFd;
|
||||
int32_t peerFd; // forward FD
|
||||
int32_t numOfRetrieves; // number of retrieves tried
|
||||
|
@ -172,6 +173,7 @@ typedef struct SSyncNode {
|
|||
FNotifyRole notifyRole;
|
||||
FNotifyFlowCtrl notifyFlowCtrl;
|
||||
FNotifyFileSynced notifyFileSynced;
|
||||
FGetFileVersion getFileVersion;
|
||||
pthread_mutex_t mutex;
|
||||
} SSyncNode;
|
||||
|
||||
|
|
|
@ -196,6 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
|
|||
pNode->confirmForward = pInfo->confirmForward;
|
||||
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
|
||||
pNode->notifyFileSynced = pInfo->notifyFileSynced;
|
||||
pNode->getFileVersion = pInfo->getFileVersion;
|
||||
|
||||
pNode->selfIndex = -1;
|
||||
pNode->vgId = pInfo->vgId;
|
||||
|
@ -540,7 +541,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
|||
pPeer->ip = ip;
|
||||
pPeer->port = pInfo->nodePort;
|
||||
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
|
||||
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
|
||||
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, nodeId:%d", pNode->vgId, pPeer->nodeId);
|
||||
|
||||
pPeer->peerFd = -1;
|
||||
pPeer->syncFd = -1;
|
||||
|
@ -1143,8 +1144,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
|||
pPeer->syncFd = connFd;
|
||||
syncCreateRestoreDataThread(pPeer);
|
||||
} else {
|
||||
sDebug("%s, TCP connection is already up(pfd:%d), close one, new pfd:%d sfd:%d", pPeer->id, pPeer->peerFd, connFd,
|
||||
pPeer->syncFd);
|
||||
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
|
||||
syncClosePeerConn(pPeer);
|
||||
pPeer->peerFd = connFd;
|
||||
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
|
||||
|
|
|
@ -52,12 +52,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
|
|||
|
||||
static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
|
||||
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
|
||||
SFileAck fileAck;
|
||||
SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */
|
||||
SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */
|
||||
SFileAck fileAck = {0};
|
||||
int32_t code = -1;
|
||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||
uint32_t pindex = 0; // index in last restore
|
||||
uint32_t pindex = 0; // index in last restore
|
||||
bool fileChanged = false;
|
||||
|
||||
*fversion = 0;
|
||||
|
@ -134,7 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
// data file is changed, code shall be set to 1
|
||||
*fversion = minfo.fversion;
|
||||
code = 1;
|
||||
sDebug("%s, file changed while restore file", pPeer->id);
|
||||
sDebug("%s, file changed after restore file, fver:%" PRIu64, pPeer->id, *fversion);
|
||||
}
|
||||
|
||||
if (code < 0) {
|
||||
|
@ -160,7 +160,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
|||
}
|
||||
|
||||
if (pHead->len == 0) {
|
||||
sDebug("%s, wal is synced over", pPeer->id);
|
||||
sDebug("%s, wal is synced over, last wver:%" PRIu64, pPeer->id, lastVer);
|
||||
code = 0;
|
||||
break;
|
||||
} // wal sync over
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include <sys/inotify.h>
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "tutil.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -25,85 +26,35 @@
|
|||
#include "tsync.h"
|
||||
#include "syncInt.h"
|
||||
|
||||
static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
|
||||
sDebug("%s, start to monitor:%s", pPeer->id, name);
|
||||
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
||||
if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pPeer->notifyFd <= 0) {
|
||||
pPeer->watchNum = 0;
|
||||
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
|
||||
if (pPeer->notifyFd < 0) {
|
||||
sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
|
||||
if (pPeer->watchFd == NULL) {
|
||||
sError("%s, failed to allocate watchFd", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
|
||||
uint64_t fver = 0;
|
||||
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
|
||||
if (code != 0) {
|
||||
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer);
|
||||
pPeer->fileChanged = 1;
|
||||
return TSDB_CODE_SYN_VND_COMMITING;
|
||||
}
|
||||
|
||||
int32_t *wd = pPeer->watchFd + pPeer->watchNum;
|
||||
|
||||
if (*wd >= 0) {
|
||||
if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) {
|
||||
sError("%s, failed to remove wd:%d since %s", pPeer->id, *wd, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
if (fver != pPeer->lastVer) {
|
||||
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
|
||||
pPeer->fileChanged = 1;
|
||||
return TSDB_CODE_SYN_FILE_CHNAGED;
|
||||
}
|
||||
|
||||
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE);
|
||||
if (*wd == -1) {
|
||||
sError("%s, failed to add %s since %s", pPeer->id, name, strerror(errno));
|
||||
return -1;
|
||||
} else {
|
||||
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
|
||||
}
|
||||
|
||||
pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t syncAreFilesModified(SSyncPeer *pPeer) {
|
||||
if (pPeer->notifyFd <= 0) return 0;
|
||||
|
||||
char buf[2048];
|
||||
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
|
||||
if (len < 0 && errno != EAGAIN) {
|
||||
sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if (len > 0) {
|
||||
const struct inotify_event *event;
|
||||
char *ptr;
|
||||
for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
|
||||
event = (const struct inotify_event *)ptr;
|
||||
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
|
||||
sDebug("%s, processed file is changed", pPeer->id);
|
||||
pPeer->fileChanged = 1;
|
||||
code = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
pPeer->fileChanged = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SFileInfo fileInfo;
|
||||
SFileAck fileAck;
|
||||
int32_t code = -1;
|
||||
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
|
||||
SFileAck fileAck = {0};
|
||||
int32_t code = TSDB_CODE_SYN_APP_ERROR;
|
||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||
|
||||
memset(&fileInfo, 0, sizeof(fileInfo));
|
||||
memset(&fileAck, 0, sizeof(fileAck));
|
||||
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
|
||||
|
||||
while (1) {
|
||||
// retrieve file info
|
||||
|
@ -111,24 +62,27 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
||||
&fileInfo.size, &fileInfo.fversion);
|
||||
// fileInfo.size = htonl(size);
|
||||
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
||||
|
||||
// send the file info
|
||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
||||
if (ret < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
// if no file anymore, break
|
||||
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
sDebug("%s, no more files to sync", pPeer->id);
|
||||
code = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
// wait for the ack from peer
|
||||
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
||||
if (ret < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
@ -136,15 +90,6 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
// set the peer sync version
|
||||
pPeer->sversion = fileInfo.fversion;
|
||||
|
||||
// get the full path to file
|
||||
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
|
||||
|
||||
// add the file into watch list
|
||||
if (syncAddIntoWatchList(pPeer, name) < 0) {
|
||||
sError("%s, failed to watch file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
// if sync is not required, continue
|
||||
if (fileAck.sync == 0) {
|
||||
fileInfo.index++;
|
||||
|
@ -152,9 +97,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// get the full path to file
|
||||
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
|
||||
|
||||
// send the file to peer
|
||||
int32_t sfd = open(name, O_RDONLY);
|
||||
if (sfd < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
@ -162,22 +111,21 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
||||
close(sfd);
|
||||
if (ret < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
|
||||
sDebug("%s, file:%s is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
||||
fileInfo.index++;
|
||||
|
||||
// check if processed files are modified
|
||||
if (syncAreFilesModified(pPeer) != 0) {
|
||||
sInfo("%s, file:%s are modified while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||
break;
|
||||
}
|
||||
code = syncAreFilesModified(pNode, pPeer);
|
||||
if (code != TSDB_CODE_SUCCESS) break;
|
||||
}
|
||||
|
||||
if (code < 0) {
|
||||
sError("%s, failed to retrieve file", pPeer->id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -308,9 +256,9 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
|||
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int32_t code = -1;
|
||||
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
|
||||
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
|
||||
|
||||
if (syncAreFilesModified(pPeer) != 0) return -1;
|
||||
if (syncAreFilesModified(pNode, pPeer) != 0) return -1;
|
||||
|
||||
while (1) {
|
||||
int32_t once = 0; // last WAL has once ever been processed
|
||||
|
@ -429,9 +377,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|||
close(sfd);
|
||||
if (code < 0) break;
|
||||
|
||||
index++;
|
||||
|
||||
if (syncAreFilesModified(pPeer) != 0) break;
|
||||
if (syncAreFilesModified(pNode, pPeer) != 0) break;
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
|
@ -481,18 +427,18 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|||
|
||||
pPeer->sversion = 0;
|
||||
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
|
||||
sInfo("%s, start to retrieve file, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||
sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||
if (syncRetrieveFile(pPeer) < 0) {
|
||||
sError("%s, failed to retrieve file", pPeer->id);
|
||||
sError("%s, failed to retrieve files", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// if no files are synced, there must be wal to sync, sversion must be larger than one
|
||||
if (pPeer->sversion == 0) pPeer->sversion = 1;
|
||||
|
||||
sInfo("%s, start to retrieve wal", pPeer->id);
|
||||
sInfo("%s, start to retrieve wals", pPeer->id);
|
||||
if (syncRetrieveWal(pPeer) < 0) {
|
||||
sError("%s, failed to retrieve wal", pPeer->id);
|
||||
sError("%s, failed to retrieve wals", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -506,7 +452,6 @@ void *syncRetrieveData(void *param) {
|
|||
|
||||
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
||||
|
||||
pPeer->fileChanged = 0;
|
||||
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
||||
if (pPeer->syncFd < 0) {
|
||||
sError("%s, failed to open socket to sync", pPeer->id);
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct {
|
|||
int8_t role;
|
||||
int8_t accessState;
|
||||
int8_t isFull;
|
||||
int8_t isCommiting;
|
||||
uint64_t version; // current version
|
||||
uint64_t fversion; // version on saved data file
|
||||
void *wqueue;
|
||||
|
|
|
@ -38,6 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level);
|
|||
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
|
||||
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
||||
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
||||
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver);
|
||||
|
||||
#ifndef _SYNC
|
||||
int64_t syncStart(const SSyncInfo *info) { return NULL; }
|
||||
|
@ -352,6 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
syncInfo.notifyRole = vnodeNotifyRole;
|
||||
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
|
||||
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
|
||||
syncInfo.getFileVersion = vnodeGetFileVersion;
|
||||
pVnode->sync = syncStart(&syncInfo);
|
||||
|
||||
#ifndef _SYNC
|
||||
|
@ -596,18 +598,19 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
|||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
||||
// TODO: this is a simple implement
|
||||
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
||||
SVnodeObj *pVnode = arg;
|
||||
|
||||
if (eno != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, failed to commit since %s, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, tstrerror(eno),
|
||||
pVnode->fversion, pVnode->version);
|
||||
pVnode->isCommiting = 0;
|
||||
pVnode->isFull = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (status == TSDB_STATUS_COMMIT_START) {
|
||||
pVnode->isCommiting = 1;
|
||||
pVnode->fversion = pVnode->version;
|
||||
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
if (pVnode->status != TAOS_VN_STATUS_INIT) {
|
||||
|
@ -618,6 +621,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
|||
|
||||
if (status == TSDB_STATUS_COMMIT_OVER) {
|
||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
pVnode->isCommiting = 0;
|
||||
pVnode->isFull = 0;
|
||||
if (pVnode->status != TAOS_VN_STATUS_INIT) {
|
||||
walRemoveOneOldFile(pVnode->wal);
|
||||
|
@ -683,8 +687,10 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
|||
return;
|
||||
}
|
||||
|
||||
pVnode->flowctrlLevel = level;
|
||||
vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level);
|
||||
if (pVnode->flowctrlLevel != level) {
|
||||
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
||||
pVnode->flowctrlLevel = level;
|
||||
}
|
||||
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
@ -764,3 +770,22 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void
|
|||
vnodeRelease(pVnode);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
vError("vgId:%d, vnode not found while write to cache", vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if (pVnode->isCommiting) {
|
||||
vDebug("vgId:%d, vnode is commiting while get file version", vgId);
|
||||
code = -1;
|
||||
} else {
|
||||
*fver = pVnode->fversion;
|
||||
}
|
||||
|
||||
vnodeRelease(pVnode);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -282,13 +282,15 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
|||
|
||||
pWrite->processedCount++;
|
||||
if (pWrite->processedCount > 100) {
|
||||
vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code));
|
||||
vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code),
|
||||
pWrite->processedCount);
|
||||
pWrite->processedCount = 1;
|
||||
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
|
||||
} else {
|
||||
code = vnodePerformFlowCtrl(pWrite);
|
||||
if (code == 0) {
|
||||
vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
|
||||
vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite,
|
||||
pWrite->processedCount);
|
||||
pWrite->processedCount = 0;
|
||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||
}
|
||||
|
@ -310,7 +312,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
|||
void *unUsed = NULL;
|
||||
taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed);
|
||||
|
||||
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
|
||||
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, retry:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
|
||||
pWrite->processedCount);
|
||||
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
|
|||
code = (*fileId == pWal->fileId) ? 0 : 1;
|
||||
}
|
||||
|
||||
wTrace("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId);
|
||||
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId);
|
||||
pthread_mutex_unlock(&(pWal->mutex));
|
||||
|
||||
return code;
|
||||
|
@ -325,7 +325,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
|||
|
||||
offset = offset + sizeof(SWalHead) + pHead->len;
|
||||
|
||||
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
|
||||
wDebug("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
|
||||
fileId, pHead->version, pWal->version, pHead->len);
|
||||
|
||||
pWal->version = pHead->version;
|
||||
|
|
|
@ -82,6 +82,7 @@ restful d1 table_rest 1591772800 30000
|
|||
restful d1 table_rest 1591872800 30000
|
||||
restful d1 table_rest 1591972800 30000
|
||||
|
||||
sleep 1000
|
||||
sql select * from table_rest;
|
||||
print rows: $rows
|
||||
if $rows != 300000 then
|
||||
|
|
|
@ -20,6 +20,10 @@ system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
|
|||
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
|
||||
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c minTablesPerVnode -v 1000
|
||||
system sh/cfg.sh -n dnode2 -c minTablesPerVnode -v 1000
|
||||
system sh/cfg.sh -n dnode3 -c minTablesPerVnode -v 1000
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
|
||||
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
|
||||
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
|
||||
|
|
Loading…
Reference in New Issue