fix sdb bugs
This commit is contained in:
parent
90922fd213
commit
f7908813f5
|
@ -55,9 +55,9 @@ typedef struct {
|
|||
int32_t mnodeInit(SMnodePara para);
|
||||
void mnodeCleanup();
|
||||
|
||||
int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg);
|
||||
void mnodeUnDeploy(char *path);
|
||||
int32_t mnodeStart(char *path, SMnodeCfg *pCfg);
|
||||
int32_t mnodeDeploy(SMnodeCfg *pCfg);
|
||||
void mnodeUnDeploy();
|
||||
int32_t mnodeStart(SMnodeCfg *pCfg);
|
||||
int32_t mnodeAlter(SMnodeCfg *pCfg);
|
||||
void mnodeStop();
|
||||
|
||||
|
|
|
@ -157,8 +157,8 @@ void sdbUnDeploy();
|
|||
|
||||
void *sdbAcquire(ESdbType sdb, void *pKey);
|
||||
void sdbRelease(void *pObj);
|
||||
void *sdbFetch(ESdbType sdb, void *pIter);
|
||||
void sdbCancelFetch(ESdbType sdb, void *pIter);
|
||||
void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj);
|
||||
void sdbCancelFetch(void *pIter);
|
||||
int32_t sdbGetSize(ESdbType sdb);
|
||||
|
||||
SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen);
|
||||
|
|
|
@ -136,8 +136,8 @@ static int32_t dnodeWriteMnodeFile() {
|
|||
char *content = calloc(1, maxLen + 1);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, "{\n");
|
||||
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.dropped);
|
||||
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsMnode.dropped);
|
||||
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.deployed);
|
||||
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", tsMnode.dropped);
|
||||
len += snprintf(content + len, maxLen - len, "}\n");
|
||||
|
||||
fwrite(content, 1, len, fp);
|
||||
|
@ -180,7 +180,7 @@ static int32_t dnodeStartMnode() {
|
|||
tsMnode.deployed = 1;
|
||||
taosWUnLockLatch(&tsMnode.latch);
|
||||
|
||||
return code;
|
||||
return mnodeStart(NULL);
|
||||
}
|
||||
|
||||
static void dnodeStopMnode() {
|
||||
|
@ -212,14 +212,14 @@ static int32_t dnodeUnDeployMnode() {
|
|||
}
|
||||
|
||||
dnodeStopMnode();
|
||||
mnodeUnDeploy(tsMnodeDir);
|
||||
mnodeUnDeploy();
|
||||
dnodeWriteMnodeFile();
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t dnodeDeployMnode(SMnodeCfg *pCfg) {
|
||||
int32_t code = mnodeDeploy(tsMnodeDir, pCfg);
|
||||
int32_t code = mnodeDeploy(pCfg);
|
||||
if (code != 0) {
|
||||
dError("failed to deploy mnode since %s", tstrerror(code));
|
||||
return code;
|
||||
|
|
|
@ -134,10 +134,11 @@ static int32_t mnodeAllocInitSteps() {
|
|||
}
|
||||
|
||||
static int32_t mnodeAllocStartSteps() {
|
||||
struct SSteps *steps = taosStepInit(7, NULL);
|
||||
struct SSteps *steps = taosStepInit(8, NULL);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL);
|
||||
taosStepAdd(steps, "mnode-sdb-file", sdbRead, (CleanupFp)sdbCommit);
|
||||
taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance);
|
||||
taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile);
|
||||
taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
|
||||
|
@ -170,7 +171,7 @@ int32_t mnodeInit(SMnodePara para) {
|
|||
|
||||
void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); }
|
||||
|
||||
int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
|
||||
int32_t mnodeDeploy(SMnodeCfg *pCfg) {
|
||||
if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) {
|
||||
if (sdbDeploy() != 0) {
|
||||
mError("failed to deploy sdb since %s", terrstr());
|
||||
|
@ -182,9 +183,9 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void mnodeUnDeploy(char *path) { sdbUnDeploy(); }
|
||||
void mnodeUnDeploy() { sdbUnDeploy(); }
|
||||
|
||||
int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); }
|
||||
int32_t mnodeStart(SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); }
|
||||
|
||||
int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; }
|
||||
|
||||
|
|
|
@ -73,7 +73,9 @@ static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; }
|
|||
static int32_t mnodeAcctActionDelete(SAcctObj *pAcct) { return 0; }
|
||||
|
||||
static int32_t mnodeAcctActionUpdate(SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
|
||||
memcpy(pDstAcct, pSrcAcct, (int32_t)((char *)&pDstAcct->info - (char *)&pDstAcct));
|
||||
SAcctObj tObj;
|
||||
int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj);
|
||||
memcpy(pDstAcct, pSrcAcct, len);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) {
|
|||
int32_t dataPos = 0;
|
||||
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_KEY_LEN)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
|
||||
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
|
||||
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
|
||||
SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth)
|
||||
|
@ -46,7 +46,7 @@ static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
|
||||
SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj));
|
||||
SUserObj *pUser = sdbGetRowObj(pRow);
|
||||
if (pUser == NULL) return NULL;
|
||||
|
||||
|
@ -92,7 +92,9 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) {
|
|||
}
|
||||
|
||||
static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) {
|
||||
memcpy(pDstUser, pSrcUser, (int32_t)((char *)&pDstUser->prohibitDbHash - (char *)&pDstUser));
|
||||
SUserObj tObj;
|
||||
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
|
||||
memcpy(pDstUser, pSrcUser, len);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -71,6 +71,8 @@ typedef struct {
|
|||
|
||||
extern SSdbMgr tsSdb;
|
||||
|
||||
int32_t sdbWriteImp(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -66,7 +66,7 @@ static int32_t sdbReadDataFile() {
|
|||
|
||||
char file[PATH_MAX] = {0};
|
||||
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
|
||||
FileFd fd = taosOpenFileCreateWrite(file);
|
||||
FileFd fd = taosOpenFileRead(file);
|
||||
if (fd <= 0) {
|
||||
free(pRaw);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -76,9 +76,12 @@ static int32_t sdbReadDataFile() {
|
|||
|
||||
int64_t offset = 0;
|
||||
int32_t code = 0;
|
||||
int32_t readLen = 0;
|
||||
int64_t ret = 0;
|
||||
|
||||
while (1) {
|
||||
int64_t ret = taosReadFile(fd, pRaw, sizeof(SSdbRaw));
|
||||
readLen = sizeof(SSdbRaw);
|
||||
ret = taosReadFile(fd, pRaw, readLen);
|
||||
if (ret == 0) break;
|
||||
|
||||
if (ret < 0) {
|
||||
|
@ -87,33 +90,34 @@ static int32_t sdbReadDataFile() {
|
|||
break;
|
||||
}
|
||||
|
||||
if (ret < sizeof(SSdbRaw)) {
|
||||
if (ret != readLen) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
ret = taosReadFile(fd, pRaw->pData, pRaw->dataLen + sizeof(int32_t));
|
||||
readLen = pRaw->dataLen + sizeof(int32_t);
|
||||
ret = taosReadFile(fd, pRaw->pData, readLen);
|
||||
if (ret < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret < pRaw->dataLen + sizeof(int32_t)) {
|
||||
if (ret != readLen) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t cksum = *(int32_t *)(pRaw->pData + pRaw->dataLen);
|
||||
if (!taosCheckChecksumWhole(pRaw, sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t)) != 0) {
|
||||
int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
|
||||
if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) {
|
||||
code = TSDB_CODE_CHECKSUM_ERROR;
|
||||
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
code = sdbWrite(pRaw);
|
||||
code = sdbWriteImp(pRaw);
|
||||
if (code != 0) {
|
||||
mError("failed to read file:%s since %s", file, terrstr());
|
||||
goto PARSE_SDB_DATA_ERROR;
|
||||
|
@ -150,39 +154,47 @@ static int32_t sdbWriteDataFile() {
|
|||
SRWLatch *pLock = &tsSdb.locks[i];
|
||||
taosWLockLatch(pLock);
|
||||
|
||||
SSdbRow *pRow = taosHashIterate(hash, NULL);
|
||||
while (pRow != NULL) {
|
||||
if (pRow->status != SDB_STATUS_READY) continue;
|
||||
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
||||
while (ppRow != NULL) {
|
||||
SSdbRow *pRow = *ppRow;
|
||||
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
|
||||
ppRow = taosHashIterate(hash, ppRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
|
||||
if (pRaw != NULL) {
|
||||
pRaw->status = pRow->status;
|
||||
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
|
||||
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
|
||||
code = TAOS_SYSTEM_ERROR(terrno);
|
||||
taosHashCancelIterate(hash, ppRow);
|
||||
break;
|
||||
}
|
||||
int32_t cksum = taosCalcChecksum(0, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
|
||||
|
||||
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
|
||||
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
|
||||
code = TAOS_SYSTEM_ERROR(terrno);
|
||||
taosHashCancelIterate(hash, ppRow);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
taosHashCancelIterate(hash, pRow);
|
||||
code = TSDB_CODE_SDB_APP_ERROR;
|
||||
taosHashCancelIterate(hash, ppRow);
|
||||
break;
|
||||
}
|
||||
|
||||
pRow = taosHashIterate(hash, pRow);
|
||||
ppRow = taosHashIterate(hash, ppRow);
|
||||
}
|
||||
taosWUnLockLatch(pLock);
|
||||
}
|
||||
|
||||
taosCloseFile(fd);
|
||||
|
||||
if (code == 0) {
|
||||
code = taosFsyncFile(fd);
|
||||
}
|
||||
|
||||
taosCloseFile(fd);
|
||||
|
||||
if (code == 0) {
|
||||
char curfile[PATH_MAX] = {0};
|
||||
snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
|
||||
|
@ -210,13 +222,11 @@ int32_t sdbRead() {
|
|||
}
|
||||
|
||||
int32_t sdbCommit() {
|
||||
mDebug("start to commit mnode file");
|
||||
mDebug("start to write mnode file");
|
||||
return sdbWriteDataFile();
|
||||
}
|
||||
|
||||
int32_t sdbDeploy() {
|
||||
mDebug("start to deploy mnode");
|
||||
|
||||
if (sdbCreateDir() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -86,12 +86,12 @@ static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
|
|||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
||||
taosRLockLatch(pLock);
|
||||
|
||||
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||
if (pDstRow == NULL) {
|
||||
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||
if (ppDstRow == NULL || *ppDstRow == NULL) {
|
||||
taosRUnLockLatch(pLock);
|
||||
return -1;
|
||||
return sdbInsertRow(hash, pRaw, pRow, keySize);
|
||||
}
|
||||
SSdbRow *pDstRow = *ppDstRow;
|
||||
|
||||
pRow->status = pRaw->status;
|
||||
taosRUnLockLatch(pLock);
|
||||
|
@ -110,12 +110,13 @@ static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
|
|||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
||||
taosWLockLatch(pLock);
|
||||
|
||||
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||
if (pDstRow == NULL) {
|
||||
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
|
||||
if (ppDstRow == NULL || *ppDstRow) {
|
||||
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||
taosWUnLockLatch(pLock);
|
||||
return -1;
|
||||
}
|
||||
SSdbRow *pDstRow = *ppDstRow;
|
||||
|
||||
pRow->status = pRaw->status;
|
||||
taosHashRemove(hash, pRow->pObj, keySize);
|
||||
|
@ -123,16 +124,14 @@ static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
|
|||
|
||||
SdbDeleteFp deleteFp = tsSdb.deleteFps[pRow->sdb];
|
||||
if (deleteFp != NULL) {
|
||||
if ((*deleteFp)(pRow->pObj) != 0) {
|
||||
return -1;
|
||||
}
|
||||
(void)(*deleteFp)(pRow->pObj);
|
||||
}
|
||||
|
||||
sdbRelease(pRow->pObj);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sdbWrite(SSdbRaw *pRaw) {
|
||||
int32_t sdbWriteImp(SSdbRaw *pRaw) {
|
||||
SHashObj *hash = sdbGetHash(pRaw->sdb);
|
||||
if (hash == NULL) return -1;
|
||||
|
||||
|
@ -171,6 +170,12 @@ int32_t sdbWrite(SSdbRaw *pRaw) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t sdbWrite(SSdbRaw *pRaw) {
|
||||
int32_t code = sdbWriteImp(pRaw);
|
||||
sdbFreeRaw(pRaw);
|
||||
return code;
|
||||
}
|
||||
|
||||
void *sdbAcquire(ESdbType sdb, void *pKey) {
|
||||
SHashObj *hash = sdbGetHash(sdb);
|
||||
if (hash == NULL) return NULL;
|
||||
|
@ -182,7 +187,7 @@ void *sdbAcquire(ESdbType sdb, void *pKey) {
|
|||
taosRLockLatch(pLock);
|
||||
|
||||
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
|
||||
if (ppRow == NULL || *ppRow) {
|
||||
if (ppRow == NULL || *ppRow == NULL) {
|
||||
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
|
||||
taosRUnLockLatch(pLock);
|
||||
return NULL;
|
||||
|
@ -226,27 +231,37 @@ void sdbRelease(void *pObj) {
|
|||
taosRUnLockLatch(pLock);
|
||||
}
|
||||
|
||||
void *sdbFetchRow(ESdbType sdb, void *pIter) {
|
||||
void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj) {
|
||||
SHashObj *hash = sdbGetHash(sdb);
|
||||
if (hash == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (hash == NULL) return NULL;
|
||||
|
||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
||||
taosRLockLatch(pLock);
|
||||
void *pRet = taosHashIterate(hash, pIter);
|
||||
|
||||
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
|
||||
while (ppRow != NULL) {
|
||||
SSdbRow *pRow = *ppRow;
|
||||
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
|
||||
ppRow = taosHashIterate(hash, ppRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
||||
*ppObj = pRow->pObj;
|
||||
break;
|
||||
}
|
||||
taosRUnLockLatch(pLock);
|
||||
|
||||
return pRet;
|
||||
return ppRow;
|
||||
}
|
||||
|
||||
void sdbCancelFetch(ESdbType sdb, void *pIter) {
|
||||
SHashObj *hash = sdbGetHash(sdb);
|
||||
if (hash == NULL) {
|
||||
return;
|
||||
}
|
||||
void sdbCancelFetch(void *pIter) {
|
||||
if (pIter == NULL) return;
|
||||
SSdbRow *pRow = *(SSdbRow **)pIter;
|
||||
SHashObj *hash = sdbGetHash(pRow->sdb);
|
||||
if (hash == NULL) return;
|
||||
|
||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
||||
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
|
||||
taosRLockLatch(pLock);
|
||||
taosHashCancelIterate(hash, pIter);
|
||||
taosRUnLockLatch(pLock);
|
||||
|
@ -254,9 +269,7 @@ void sdbCancelFetch(ESdbType sdb, void *pIter) {
|
|||
|
||||
int32_t sdbGetSize(ESdbType sdb) {
|
||||
SHashObj *hash = sdbGetHash(sdb);
|
||||
if (hash == NULL) {
|
||||
return 0;
|
||||
}
|
||||
if (hash == NULL) return 0;
|
||||
|
||||
SRWLatch *pLock = &tsSdb.locks[sdb];
|
||||
taosRLockLatch(pLock);
|
||||
|
|
Loading…
Reference in New Issue