[TD-1920]load mininal modules,and disable sync wal when compact mnode wal

This commit is contained in:
lichuang 2021-05-28 19:37:47 +08:00
parent a0533072db
commit 35e2285328
4 changed files with 33 additions and 7 deletions

View File

@ -86,6 +86,17 @@ static SStep tsDnodeSteps[] = {
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, {"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
}; };
static SStep tsDnodeCompactSteps[] = {
{"dnode-tfile", tfInit, tfCleanup},
{"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnode-eps", dnodeInitEps, dnodeCleanupEps},
{"dnode-wal", walInit, walCleanUp},
{"dnode-mread", dnodeInitMRead, NULL},
{"dnode-mwrite", dnodeInitMWrite, NULL},
{"dnode-mpeer", dnodeInitMPeer, NULL},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
};
static int dnodeCreateDir(const char *dir) { static int dnodeCreateDir(const char *dir) {
if (mkdir(dir, 0755) != 0 && errno != EEXIST) { if (mkdir(dir, 0755) != 0 && errno != EEXIST) {
return -1; return -1;
@ -95,13 +106,23 @@ static int dnodeCreateDir(const char *dir) {
} }
static void dnodeCleanupComponents() { static void dnodeCleanupComponents() {
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep); int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize); dnodeStepCleanup(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeCompactSteps, stepSize);
}
} }
static int32_t dnodeInitComponents() { static int32_t dnodeInitComponents() {
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep); int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize); return dnodeStepInit(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeCompactSteps, stepSize);
}
} }
static int32_t dnodeInitTmr() { static int32_t dnodeInitTmr() {

View File

@ -121,7 +121,7 @@ int32_t mnodeStartSystem() {
int32_t mnodeInitSystem() { int32_t mnodeInitSystem() {
mnodeInitTimer(); mnodeInitTimer();
if (mnodeNeedStart()) { if (mnodeNeedStart() || tsCompactMnodeWal) {
return mnodeStartSystem(); return mnodeStartSystem();
} }
return 0; return 0;

View File

@ -690,7 +690,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
pthread_mutex_unlock(&tsSdbMgmt.mutex); pthread_mutex_unlock(&tsSdbMgmt.mutex);
// from app, row is created // from app, row is created
if (pRow != NULL) { if (pRow != NULL && tsCompactMnodeWal != 1) {
// forward to peers // forward to peers
pRow->processedCount = 0; pRow->processedCount = 0;
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
@ -713,7 +713,9 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
if (tsCompactMnodeWal != 1) {
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
}
// from wal or forward msg, row not created, should add into hash // from wal or forward msg, row not created, should add into hash
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {

View File

@ -383,6 +383,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
break;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset); walFtruncate(pWal, tfd, offset);
break; break;
@ -430,6 +431,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset); pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset);
pWal->version = pHead->version; pWal->version = pHead->version;
//wInfo("writeFp: %ld", offset);
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }