458 lines
12 KiB
C
458 lines
12 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#define _DEFAULT_SOURCE
|
|
#include "sdbInt.h"
|
|
#include "tchecksum.h"
|
|
#include "wal.h"
|
|
|
|
#define SDB_TABLE_SIZE 24
|
|
#define SDB_RESERVE_SIZE 512
|
|
#define SDB_FILE_VER 1
|
|
|
|
static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
|
mDebug("start to deploy sdb");
|
|
|
|
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
|
SdbDeployFp fp = pSdb->deployFps[i];
|
|
if (fp == NULL) continue;
|
|
|
|
if ((*fp)(pSdb->pMnode) != 0) {
|
|
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
mDebug("sdb deploy successfully");
|
|
return 0;
|
|
}
|
|
|
|
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|
int64_t sver = 0;
|
|
int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(int64_t)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
if (sver != SDB_FILE_VER) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
|
|
ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(int64_t)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
|
|
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(int64_t)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
|
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
|
int64_t maxId = 0;
|
|
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(int64_t)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
if (i < SDB_MAX) {
|
|
pSdb->maxId[i] = maxId;
|
|
}
|
|
}
|
|
|
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
|
int64_t ver = 0;
|
|
ret = taosReadFile(pFile, &ver, sizeof(int64_t));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(int64_t)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
if (i < SDB_MAX) {
|
|
pSdb->tableVer[i] = ver;
|
|
}
|
|
}
|
|
|
|
char reserve[SDB_RESERVE_SIZE] = {0};
|
|
ret = taosReadFile(pFile, reserve, sizeof(reserve));
|
|
if (ret < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
if (ret != sizeof(reserve)) {
|
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|
int64_t sver = SDB_FILE_VER;
|
|
if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
|
|
if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
|
|
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
|
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
|
int64_t maxId = 0;
|
|
if (i < SDB_MAX) {
|
|
maxId = pSdb->maxId[i];
|
|
}
|
|
if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
|
int64_t ver = 0;
|
|
if (i < SDB_MAX) {
|
|
ver = pSdb->tableVer[i];
|
|
}
|
|
if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
char reserve[SDB_RESERVE_SIZE] = {0};
|
|
if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t sdbReadFile(SSdb *pSdb) {
|
|
int64_t offset = 0;
|
|
int32_t code = 0;
|
|
int32_t readLen = 0;
|
|
int64_t ret = 0;
|
|
|
|
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
|
|
if (pRaw == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
mError("failed read file since %s", terrstr());
|
|
return -1;
|
|
}
|
|
|
|
char file[PATH_MAX] = {0};
|
|
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
|
mDebug("start to read file:%s", file);
|
|
|
|
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
|
if (pFile == NULL) {
|
|
taosMemoryFree(pRaw);
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to read file:%s since %s", file, terrstr());
|
|
return 0;
|
|
}
|
|
|
|
if (sdbReadFileHead(pSdb, pFile) != 0) {
|
|
mError("failed to read file:%s head since %s", file, terrstr());
|
|
pSdb->curVer = -1;
|
|
pSdb->curTerm = -1;
|
|
taosMemoryFree(pRaw);
|
|
taosCloseFile(&pFile);
|
|
return -1;
|
|
}
|
|
|
|
int64_t tableVer[SDB_MAX] = {0};
|
|
memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));
|
|
|
|
while (1) {
|
|
readLen = sizeof(SSdbRaw);
|
|
ret = taosReadFile(pFile, pRaw, readLen);
|
|
if (ret == 0) break;
|
|
|
|
if (ret < 0) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
|
break;
|
|
}
|
|
|
|
if (ret != readLen) {
|
|
code = TSDB_CODE_FILE_CORRUPTED;
|
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
|
break;
|
|
}
|
|
|
|
readLen = pRaw->dataLen + sizeof(int32_t);
|
|
ret = taosReadFile(pFile, pRaw->pData, readLen);
|
|
if (ret < 0) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
|
break;
|
|
}
|
|
|
|
if (ret != readLen) {
|
|
code = TSDB_CODE_FILE_CORRUPTED;
|
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
|
break;
|
|
}
|
|
|
|
int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
|
|
if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
|
|
code = TSDB_CODE_CHECKSUM_ERROR;
|
|
mError("failed to read file:%s since %s", file, tstrerror(code));
|
|
break;
|
|
}
|
|
|
|
code = sdbWriteWithoutFree(pSdb, pRaw);
|
|
if (code != 0) {
|
|
mError("failed to read file:%s since %s", file, terrstr());
|
|
goto _OVER;
|
|
}
|
|
}
|
|
|
|
code = 0;
|
|
pSdb->lastCommitVer = pSdb->curVer;
|
|
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
|
|
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
|
|
|
|
_OVER:
|
|
taosCloseFile(&pFile);
|
|
sdbFreeRaw(pRaw);
|
|
|
|
terrno = code;
|
|
return code;
|
|
}
|
|
|
|
static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|
int32_t code = 0;
|
|
|
|
char tmpfile[PATH_MAX] = {0};
|
|
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
|
char curfile[PATH_MAX] = {0};
|
|
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
|
|
|
mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
|
pSdb->curTerm, pSdb->lastCommitVer);
|
|
|
|
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
|
if (pFile == NULL) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to open file:%s for write since %s", tmpfile, terrstr());
|
|
return -1;
|
|
}
|
|
|
|
if (sdbWriteFileHead(pSdb, pFile) != 0) {
|
|
mError("failed to write file:%s head since %s", tmpfile, terrstr());
|
|
taosCloseFile(&pFile);
|
|
return -1;
|
|
}
|
|
|
|
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
|
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
|
|
if (encodeFp == NULL) continue;
|
|
|
|
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
|
|
|
|
SHashObj *hash = pSdb->hashObjs[i];
|
|
TdThreadRwlock *pLock = &pSdb->locks[i];
|
|
taosThreadRwlockWrlock(pLock);
|
|
|
|
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
|
while (ppRow != NULL) {
|
|
SSdbRow *pRow = *ppRow;
|
|
if (pRow == NULL) {
|
|
ppRow = taosHashIterate(hash, ppRow);
|
|
continue;
|
|
}
|
|
|
|
if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
|
|
sdbPrintOper(pSdb, pRow, "not-write");
|
|
ppRow = taosHashIterate(hash, ppRow);
|
|
continue;
|
|
}
|
|
|
|
sdbPrintOper(pSdb, pRow, "write");
|
|
|
|
SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
|
|
if (pRaw != NULL) {
|
|
pRaw->status = pRow->status;
|
|
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
|
|
if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
taosHashCancelIterate(hash, ppRow);
|
|
sdbFreeRaw(pRaw);
|
|
break;
|
|
}
|
|
|
|
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
|
|
if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
taosHashCancelIterate(hash, ppRow);
|
|
sdbFreeRaw(pRaw);
|
|
break;
|
|
}
|
|
} else {
|
|
code = TSDB_CODE_SDB_APP_ERROR;
|
|
taosHashCancelIterate(hash, ppRow);
|
|
break;
|
|
}
|
|
|
|
sdbFreeRaw(pRaw);
|
|
ppRow = taosHashIterate(hash, ppRow);
|
|
}
|
|
taosThreadRwlockUnlock(pLock);
|
|
}
|
|
|
|
if (code == 0) {
|
|
code = taosFsyncFile(pFile);
|
|
if (code != 0) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
|
|
}
|
|
}
|
|
|
|
taosCloseFile(&pFile);
|
|
|
|
if (code == 0) {
|
|
code = taosRenameFile(tmpfile, curfile);
|
|
if (code != 0) {
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
|
}
|
|
}
|
|
|
|
if (code != 0) {
|
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
|
} else {
|
|
pSdb->lastCommitVer = pSdb->curVer;
|
|
mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
|
|
}
|
|
|
|
terrno = code;
|
|
return code;
|
|
}
|
|
|
|
int32_t sdbWriteFile(SSdb *pSdb) {
|
|
if (pSdb->curVer == pSdb->lastCommitVer) {
|
|
return 0;
|
|
}
|
|
|
|
return sdbWriteFileImp(pSdb);
|
|
}
|
|
|
|
int32_t sdbDeploy(SSdb *pSdb) {
|
|
if (sdbRunDeployFp(pSdb) != 0) {
|
|
return -1;
|
|
}
|
|
|
|
if (sdbWriteFileImp(pSdb) != 0) {
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
SSdbIter *sdbIterInit(SSdb *pSdb) {
|
|
char datafile[PATH_MAX] = {0};
|
|
char tmpfile[PATH_MAX] = {0};
|
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
|
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
|
|
|
|
if (taosCopyFile(datafile, tmpfile) != 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
|
|
return NULL;
|
|
}
|
|
|
|
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
|
|
if (pIter == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return NULL;
|
|
}
|
|
|
|
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
|
|
if (pIter->file == NULL) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
|
|
taosMemoryFree(pIter);
|
|
return NULL;
|
|
}
|
|
|
|
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
|
|
return pIter;
|
|
}
|
|
|
|
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
|
|
const int32_t maxlen = 100;
|
|
|
|
char *pBuf = taosMemoryCalloc(1, maxlen);
|
|
if (pBuf == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return NULL;
|
|
}
|
|
|
|
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
|
if (readlen == 0) {
|
|
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
|
|
taosMemoryFree(pBuf);
|
|
taosCloseFile(&pIter->file);
|
|
taosMemoryFree(pIter);
|
|
pIter = NULL;
|
|
} else if (readlen < 0) {
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
|
|
taosMemoryFree(pBuf);
|
|
taosCloseFile(&pIter->file);
|
|
taosMemoryFree(pIter);
|
|
pIter = NULL;
|
|
} else {
|
|
pIter->readlen += readlen;
|
|
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
|
|
*ppBuf = pBuf;
|
|
*buflen = readlen;
|
|
}
|
|
|
|
return pIter;
|
|
}
|