more code

This commit is contained in:
Hongze Cheng 2023-04-06 14:27:18 +08:00
parent c6d9a3a481
commit fd16a6b3b7
4 changed files with 76 additions and 40 deletions

View File

@ -262,6 +262,11 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCommitter->aFileOp = taosArrayInit(10, sizeof(struct SFileOp));
if (pCommitter->aFileOp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
// start loop
pCommitter->nextKey = pTsdb->imem->minKey; // TODO
@ -277,7 +282,12 @@ _exit:
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
int32_t code = 0;
// TODO
int32_t lino;
// code = tsdbFSBegin(pCommiter->pTsdb, pCommiter->aFileOp);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
return code;
}
@ -327,49 +337,49 @@ _exit:
return code;
}
int32_t tsdbCommitCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
SMemTable *pMemTable = pTsdb->imem;
// int32_t tsdbCommitCommit(STsdb *pTsdb) {
// int32_t code = 0;
// int32_t lino = 0;
// SMemTable *pMemTable = pTsdb->imem;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
// // lock
// taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
// code = tsdbFSCommit(pTsdb);
// if (code) {
// taosThreadRwlockUnlock(&pTsdb->rwLock);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
pTsdb->imem = NULL;
// pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) {
tsdbUnrefMemTable(pMemTable, NULL, true);
}
// // unlock
// taosThreadRwlockUnlock(&pTsdb->rwLock);
// if (pMemTable) {
// tsdbUnrefMemTable(pMemTable, NULL, true);
// }
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
}
return code;
}
// _exit:
// if (code) {
// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
// } else {
// tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
// }
// return code;
// }
int32_t tsdbCommitAbort(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
// int32_t tsdbCommitAbort(STsdb *pTsdb) {
// int32_t code = 0;
// int32_t lino = 0;
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
// code = tsdbFSRollback(pTsdb);
// TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
}
return code;
}
// _exit:
// if (code) {
// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
// } else {
// tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
// }
// return code;
// }

View File

@ -26,8 +26,11 @@ extern "C" {
struct STFileSystem;
/* Exposed APIs */
// open/close
int32_t tsdbOpenFileSystem(STsdb *pTsdb, struct STFileSystem **ppFS, int8_t rollback);
int32_t tsdbCloseFileSystem(struct STFileSystem **ppFS);
// txn
// int32_t tsdb
/* Exposed Structs */
struct STFileSystem {

View File

@ -12,3 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbSttFReader.h"
int32_t tsdbSttFReaderOpen(const struct SSttFReaderConf *pConf, struct SSttFReader **ppReader) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFReaderClose(struct SSttFReader *pReader) {
int32_t code = 0;
// TODO
return code;
}

View File

@ -16,15 +16,24 @@
#ifndef _TD_TSDB_STT_FILE_READER_H
#define _TD_TSDB_STT_FILE_READER_H
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
/* Exposed Handle */
struct SSttFReader;
struct SSttFReaderConf;
/* Exposed APIs */
int32_t tsdbSttFReaderOpen(const struct SSttFReaderConf *pConf, struct SSttFReader **ppReader);
int32_t tsdbSttFReaderClose(struct SSttFReader *pReader);
/* Exposed Structs */
struct SSttFReaderConf {
// TODO
};
#ifdef __cplusplus
}