Merge pull request #26835 from taosdata/fix/TD-30989-scan1-11
Fix/td 30989-scan1-11
This commit is contained in:
commit
b8f0bf929f
|
@ -46,7 +46,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ESdbType i = 0; i < SDB_MAX; ++i) {
|
for (ESdbType i = 0; i < SDB_MAX; ++i) {
|
||||||
taosThreadRwlockInit(&pSdb->locks[i], NULL);
|
(void)taosThreadRwlockInit(&pSdb->locks[i], NULL);
|
||||||
pSdb->maxId[i] = 0;
|
pSdb->maxId[i] = 0;
|
||||||
pSdb->tableVer[i] = 0;
|
pSdb->tableVer[i] = 0;
|
||||||
pSdb->keyTypes[i] = SDB_KEY_INT32;
|
pSdb->keyTypes[i] = SDB_KEY_INT32;
|
||||||
|
@ -60,7 +60,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
pSdb->commitTerm = -1;
|
pSdb->commitTerm = -1;
|
||||||
pSdb->commitConfig = -1;
|
pSdb->commitConfig = -1;
|
||||||
pSdb->pMnode = pOption->pMnode;
|
pSdb->pMnode = pOption->pMnode;
|
||||||
taosThreadMutexInit(&pSdb->filelock, NULL);
|
(void)taosThreadMutexInit(&pSdb->filelock, NULL);
|
||||||
mInfo("sdb init success");
|
mInfo("sdb init success");
|
||||||
return pSdb;
|
return pSdb;
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
void sdbCleanup(SSdb *pSdb) {
|
void sdbCleanup(SSdb *pSdb) {
|
||||||
mInfo("start to cleanup sdb");
|
mInfo("start to cleanup sdb");
|
||||||
|
|
||||||
sdbWriteFile(pSdb, 0);
|
(void)sdbWriteFile(pSdb, 0);
|
||||||
|
|
||||||
if (pSdb->currDir != NULL) {
|
if (pSdb->currDir != NULL) {
|
||||||
taosMemoryFreeClear(pSdb->currDir);
|
taosMemoryFreeClear(pSdb->currDir);
|
||||||
|
@ -99,14 +99,14 @@ void sdbCleanup(SSdb *pSdb) {
|
||||||
|
|
||||||
taosHashClear(hash);
|
taosHashClear(hash);
|
||||||
taosHashCleanup(hash);
|
taosHashCleanup(hash);
|
||||||
taosThreadRwlockDestroy(&pSdb->locks[i]);
|
(void)taosThreadRwlockDestroy(&pSdb->locks[i]);
|
||||||
pSdb->hashObjs[i] = NULL;
|
pSdb->hashObjs[i] = NULL;
|
||||||
memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));
|
memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));
|
||||||
|
|
||||||
mInfo("sdb table:%s is cleaned up", sdbTableName(i));
|
mInfo("sdb table:%s is cleaned up", sdbTableName(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pSdb->filelock);
|
(void)taosThreadMutexDestroy(&pSdb->filelock);
|
||||||
taosMemoryFree(pSdb);
|
taosMemoryFree(pSdb);
|
||||||
mInfo("sdb is cleaned up");
|
mInfo("sdb is cleaned up");
|
||||||
}
|
}
|
||||||
|
@ -188,19 +188,19 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config
|
||||||
void sdbWriteLock(SSdb *pSdb, int32_t type) {
|
void sdbWriteLock(SSdb *pSdb, int32_t type) {
|
||||||
TdThreadRwlock *pLock = &pSdb->locks[type];
|
TdThreadRwlock *pLock = &pSdb->locks[type];
|
||||||
// mTrace("sdb table:%d start write lock:%p", type, pLock);
|
// mTrace("sdb table:%d start write lock:%p", type, pLock);
|
||||||
taosThreadRwlockWrlock(pLock);
|
(void)taosThreadRwlockWrlock(pLock);
|
||||||
// mTrace("sdb table:%d stop write lock:%p", type, pLock);
|
// mTrace("sdb table:%d stop write lock:%p", type, pLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbReadLock(SSdb *pSdb, int32_t type) {
|
void sdbReadLock(SSdb *pSdb, int32_t type) {
|
||||||
TdThreadRwlock *pLock = &pSdb->locks[type];
|
TdThreadRwlock *pLock = &pSdb->locks[type];
|
||||||
// mTrace("sdb table:%d start read lock:%p", type, pLock);
|
// mTrace("sdb table:%d start read lock:%p", type, pLock);
|
||||||
taosThreadRwlockRdlock(pLock);
|
(void)taosThreadRwlockRdlock(pLock);
|
||||||
// mTrace("sdb table:%d stop read lock:%p", type, pLock);
|
// mTrace("sdb table:%d stop read lock:%p", type, pLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbUnLock(SSdb *pSdb, int32_t type) {
|
void sdbUnLock(SSdb *pSdb, int32_t type) {
|
||||||
TdThreadRwlock *pLock = &pSdb->locks[type];
|
TdThreadRwlock *pLock = &pSdb->locks[type];
|
||||||
// mTrace("sdb table:%d unlock:%p", type, pLock);
|
// mTrace("sdb table:%d unlock:%p", type, pLock);
|
||||||
taosThreadRwlockUnlock(pLock);
|
(void)taosThreadRwlockUnlock(pLock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
|
mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
|
||||||
taosMemoryFree(pRaw);
|
taosMemoryFree(pRaw);
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,14 +361,14 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
||||||
pSdb->commitTerm, pSdb->commitConfig);
|
pSdb->commitTerm, pSdb->commitConfig);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbReadFile(SSdb *pSdb) {
|
int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
(void)taosThreadMutexLock(&pSdb->filelock);
|
||||||
|
|
||||||
sdbResetData(pSdb);
|
sdbResetData(pSdb);
|
||||||
int32_t code = sdbReadFileImp(pSdb);
|
int32_t code = sdbReadFileImp(pSdb);
|
||||||
|
@ -377,7 +377,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
sdbResetData(pSdb);
|
sdbResetData(pSdb);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
(void)taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,7 +404,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
code = sdbWriteFileHead(pSdb, pFile);
|
code = sdbWriteFileHead(pSdb, pFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
|
mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,7 +507,9 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
if (taosCloseFile(&pFile) != 0) {
|
||||||
|
code = taosRenameFile(tmpfile, curfile);
|
||||||
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = taosRenameFile(tmpfile, curfile);
|
code = taosRenameFile(tmpfile, curfile);
|
||||||
|
@ -541,7 +543,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
(void)taosThreadMutexLock(&pSdb->filelock);
|
||||||
if (pSdb->pWal != NULL) {
|
if (pSdb->pWal != NULL) {
|
||||||
if (pSdb->sync > 0) {
|
if (pSdb->sync > 0) {
|
||||||
code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
|
code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
|
||||||
|
@ -560,7 +562,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to write sdb file since %s", tstrerror(code));
|
mError("failed to write sdb file since %s", tstrerror(code));
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
(void)taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,7 +604,7 @@ static void sdbCloseIter(SSdbIter *pIter) {
|
||||||
if (pIter == NULL) return;
|
if (pIter == NULL) return;
|
||||||
|
|
||||||
if (pIter->file != NULL) {
|
if (pIter->file != NULL) {
|
||||||
taosCloseFile(&pIter->file);
|
(void)taosCloseFile(&pIter->file);
|
||||||
pIter->file = NULL;
|
pIter->file = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -624,18 +626,18 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
|
||||||
char datafile[PATH_MAX] = {0};
|
char datafile[PATH_MAX] = {0};
|
||||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
(void)taosThreadMutexLock(&pSdb->filelock);
|
||||||
int64_t commitIndex = pSdb->commitIndex;
|
int64_t commitIndex = pSdb->commitIndex;
|
||||||
int64_t commitTerm = pSdb->commitTerm;
|
int64_t commitTerm = pSdb->commitTerm;
|
||||||
int64_t commitConfig = pSdb->commitConfig;
|
int64_t commitConfig = pSdb->commitConfig;
|
||||||
if (taosCopyFile(datafile, pIter->name) < 0) {
|
if (taosCopyFile(datafile, pIter->name) < 0) {
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
(void)taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
|
mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
|
||||||
sdbCloseIter(pIter);
|
sdbCloseIter(pIter);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pSdb->filelock);
|
(void)taosThreadMutexUnlock(&pSdb->filelock);
|
||||||
|
|
||||||
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
|
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
|
||||||
if (pIter->file == NULL) {
|
if (pIter->file == NULL) {
|
||||||
|
@ -725,7 +727,10 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pIter->file);
|
if (taosCloseFile(&pIter->file) != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
pIter->file = NULL;
|
pIter->file = NULL;
|
||||||
|
|
||||||
char datafile[PATH_MAX] = {0};
|
char datafile[PATH_MAX] = {0};
|
||||||
|
|
|
@ -176,7 +176,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno == 0) terrno = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
if (terrno == 0) terrno = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
||||||
code = terrno;
|
code = terrno;
|
||||||
taosHashRemove(hash, pRow->pObj, keySize);
|
(void)taosHashRemove(hash, pRow->pObj, keySize);
|
||||||
sdbFreeRow(pSdb, pRow, false);
|
sdbFreeRow(pSdb, pRow, false);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
sdbUnLock(pSdb, type);
|
sdbUnLock(pSdb, type);
|
||||||
|
@ -239,10 +239,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
SSdbRow *pOldRow = *ppOldRow;
|
SSdbRow *pOldRow = *ppOldRow;
|
||||||
pOldRow->status = pRaw->status;
|
pOldRow->status = pRaw->status;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldRow->refCount, 1);
|
(void)atomic_add_fetch_32(&pOldRow->refCount, 1);
|
||||||
sdbPrintOper(pSdb, pOldRow, "delete");
|
sdbPrintOper(pSdb, pOldRow, "delete");
|
||||||
|
|
||||||
taosHashRemove(hash, pOldRow->pObj, keySize);
|
TAOS_CHECK_RETURN(taosHashRemove(hash, pOldRow->pObj, keySize));
|
||||||
pSdb->tableVer[pOldRow->type]++;
|
pSdb->tableVer[pOldRow->type]++;
|
||||||
sdbUnLock(pSdb, type);
|
sdbUnLock(pSdb, type);
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ void *sdbAcquireAll(SSdb *pSdb, ESdbType type, const void *pKey, bool onlyReady)
|
||||||
SSdbRow *pRow = *ppRow;
|
SSdbRow *pRow = *ppRow;
|
||||||
switch (pRow->status) {
|
switch (pRow->status) {
|
||||||
case SDB_STATUS_READY:
|
case SDB_STATUS_READY:
|
||||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
(void)atomic_add_fetch_32(&pRow->refCount, 1);
|
||||||
pRet = pRow->pObj;
|
pRet = pRow->pObj;
|
||||||
sdbPrintOper(pSdb, pRow, "acquire");
|
sdbPrintOper(pSdb, pRow, "acquire");
|
||||||
break;
|
break;
|
||||||
|
@ -327,7 +327,7 @@ void *sdbAcquireAll(SSdb *pSdb, ESdbType type, const void *pKey, bool onlyReady)
|
||||||
if (pRet == NULL) {
|
if (pRet == NULL) {
|
||||||
if (!onlyReady) {
|
if (!onlyReady) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
(void)atomic_add_fetch_32(&pRow->refCount, 1);
|
||||||
pRet = pRow->pObj;
|
pRet = pRow->pObj;
|
||||||
sdbPrintOper(pSdb, pRow, "acquire");
|
sdbPrintOper(pSdb, pRow, "acquire");
|
||||||
}
|
}
|
||||||
|
@ -395,7 +395,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
(void)atomic_add_fetch_32(&pRow->refCount, 1);
|
||||||
sdbPrintOper(pSdb, pRow, "fetch");
|
sdbPrintOper(pSdb, pRow, "fetch");
|
||||||
*ppObj = pRow->pObj;
|
*ppObj = pRow->pObj;
|
||||||
break;
|
break;
|
||||||
|
@ -423,7 +423,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
(void)atomic_add_fetch_32(&pRow->refCount, 1);
|
||||||
sdbPrintOper(pSdb, pRow, "fetch");
|
sdbPrintOper(pSdb, pRow, "fetch");
|
||||||
*ppObj = pRow->pObj;
|
*ppObj = pRow->pObj;
|
||||||
*status = pRow->status;
|
*status = pRow->status;
|
||||||
|
|
|
@ -42,7 +42,7 @@ void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc) {
|
||||||
// remove attached object such as trans
|
// remove attached object such as trans
|
||||||
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
|
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
|
||||||
if (deleteFp != NULL) {
|
if (deleteFp != NULL) {
|
||||||
(*deleteFp)(pSdb, pRow->pObj, callFunc);
|
(void)(*deleteFp)(pSdb, pRow->pObj, callFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbPrintOper(pSdb, pRow, "free");
|
sdbPrintOper(pSdb, pRow, "free");
|
||||||
|
|
Loading…
Reference in New Issue