Merge pull request #4257 from taosdata/feature/wal

Feature/wal
This commit is contained in:
Shengliang Guan 2020-11-17 10:34:01 +08:00 committed by GitHub
commit a5003c23ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 3 deletions

View File

@ -151,6 +151,13 @@ void dnodeCleanupClient() {
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode not running", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet); dnodeUpdateEpSetForPeer(pEpSet);
} }

View File

@ -201,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full for commit is failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")

View File

@ -41,6 +41,7 @@ typedef struct {
int8_t status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
int8_t isFull;
uint64_t version; // current version uint64_t version; // current version
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
void *wqueue; void *wqueue;

View File

@ -381,6 +381,7 @@ int32_t vnodeClose(int32_t vgId) {
void vnodeRelease(void *pVnodeRaw) { void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return; if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw; SVnodeObj *pVnode = pVnodeRaw;
int32_t code = 0;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
@ -406,7 +407,7 @@ void vnodeRelease(void *pVnodeRaw) {
} }
if (pVnode->tsdb) { if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1); code = tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
} }
@ -418,7 +419,7 @@ void vnodeRelease(void *pVnodeRaw) {
} }
if (pVnode->wal) { if (pVnode->wal) {
walRemoveAllOldFiles(pVnode->wal); if (code == 0) walRemoveAllOldFiles(pVnode->wal);
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
} }
@ -594,7 +595,10 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
SVnodeObj *pVnode = arg; SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
// TODO: deal with the error here vError("vgId:%d, failed to commit since %s, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, tstrerror(eno),
pVnode->fversion, pVnode->version);
pVnode->isFull = 1;
return 0;
} }
if (status == TSDB_STATUS_COMMIT_START) { if (status == TSDB_STATUS_COMMIT_START) {
@ -609,6 +613,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
if (status == TSDB_STATUS_COMMIT_OVER) { if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isFull = 0;
walRemoveOneOldFile(pVnode->wal); walRemoveOneOldFile(pVnode->wal);
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }

View File

@ -119,6 +119,11 @@ static int32_t vnodeCheckWrite(void *param) {
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (pVnode->isFull) {
vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount);
return TSDB_CODE_VND_IS_FULL;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }