vnode/tcs: use tcs instead of s3 interface
This commit is contained in:
parent
1c7573a9ce
commit
3905c94f03
|
@ -119,6 +119,7 @@ if (${BUILD_CONTRIB})
|
||||||
vnode
|
vnode
|
||||||
PUBLIC "inc"
|
PUBLIC "inc"
|
||||||
PUBLIC "src/inc"
|
PUBLIC "src/inc"
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/tcs"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/crypt"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/crypt"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
||||||
|
@ -129,6 +130,7 @@ else()
|
||||||
vnode
|
vnode
|
||||||
PUBLIC "inc"
|
PUBLIC "inc"
|
||||||
PUBLIC "src/inc"
|
PUBLIC "src/inc"
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/tcs"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/crypt"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/crypt"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
|
||||||
|
@ -164,6 +166,7 @@ target_link_libraries(
|
||||||
PUBLIC tdb
|
PUBLIC tdb
|
||||||
PUBLIC audit
|
PUBLIC audit
|
||||||
PUBLIC crypt
|
PUBLIC crypt
|
||||||
|
PUBLIC tcs
|
||||||
|
|
||||||
# PUBLIC bdb
|
# PUBLIC bdb
|
||||||
# PUBLIC scalar
|
# PUBLIC scalar
|
||||||
|
|
|
@ -12,8 +12,8 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "cos.h"
|
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
#include "tcs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbDataFileRW.h"
|
#include "tsdbDataFileRW.h"
|
||||||
#include "tsdbIter.h"
|
#include "tsdbIter.h"
|
||||||
|
@ -1251,7 +1251,8 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
SLastCol lastColTmp = {
|
||||||
|
.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
@ -1698,8 +1699,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||||
tstrerror(code));
|
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
TAOS_CHECK_EXIT(code);
|
TAOS_CHECK_EXIT(code);
|
||||||
}
|
}
|
||||||
|
@ -3503,7 +3503,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
||||||
|
|
||||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
|
TAOS_CHECK_RETURN(tcsGetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock));
|
||||||
|
|
||||||
tsdbTrace("block:%p load from s3", *ppBlock);
|
tsdbTrace("block:%p load from s3", *ppBlock);
|
||||||
|
|
||||||
|
@ -3600,4 +3600,4 @@ void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *
|
||||||
(void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
|
(void)taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
|
||||||
|
|
||||||
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbFile2.h"
|
#include "tsdbFile2.h"
|
||||||
#include "cos.h"
|
#include "tcs.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
// to_json
|
// to_json
|
||||||
|
@ -318,7 +318,7 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
||||||
}
|
}
|
||||||
*(dot + 1) = 0;
|
*(dot + 1) = 0;
|
||||||
|
|
||||||
s3DeleteObjectsByPrefix(object_name_prefix);
|
tcsDeleteObjectsByPrefix(object_name_prefix);
|
||||||
|
|
||||||
// remove local last chunk file
|
// remove local last chunk file
|
||||||
dot = strrchr(lc_path, '.');
|
dot = strrchr(lc_path, '.');
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "cos.h"
|
|
||||||
#include "crypt.h"
|
#include "crypt.h"
|
||||||
|
#include "tcs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
@ -391,7 +391,7 @@ static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, boo
|
||||||
|
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
|
||||||
|
|
||||||
code = s3GetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
|
code = tcsGetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
memcpy(buf + n, pBlock, nRead);
|
memcpy(buf + n, pBlock, nRead);
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "cos.h"
|
#include "tcs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFS2.h"
|
#include "tsdbFS2.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
@ -426,35 +426,6 @@ static int32_t tsdbS3FidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t s3Kee
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
|
||||||
TdFilePtr fdFrom = NULL;
|
|
||||||
// TdFilePtr fdTo = NULL;
|
|
||||||
|
|
||||||
tsdbTFileName(rtner->tsdb, to, fname);
|
|
||||||
|
|
||||||
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
|
|
||||||
if (fdFrom == NULL) {
|
|
||||||
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
char *object_name = taosDirEntryBaseName(fname);
|
|
||||||
TAOS_CHECK_GOTO(s3PutObjectFromFile2(from->fname, object_name, 1), &lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
|
||||||
if (taosCloseFile(&fdFrom) != 0) {
|
|
||||||
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
|
static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -519,7 +490,7 @@ static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
||||||
int64_t c_offset = chunksize * (cn - fobj->f->lcn);
|
int64_t c_offset = chunksize * (cn - fobj->f->lcn);
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy last chunk
|
// copy last chunk
|
||||||
|
@ -618,7 +589,7 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
|
||||||
int64_t c_offset = chunksize * (cn - 1);
|
int64_t c_offset = chunksize * (cn - 1);
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(s3PutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy last chunk
|
// copy last chunk
|
||||||
|
@ -741,8 +712,6 @@ _exit:
|
||||||
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
extern int8_t tsS3EnabledCfg;
|
|
||||||
|
|
||||||
int32_t expired = grantCheck(TSDB_GRANT_OBJECT_STORAGE);
|
int32_t expired = grantCheck(TSDB_GRANT_OBJECT_STORAGE);
|
||||||
if (expired && tsS3Enabled) {
|
if (expired && tsS3Enabled) {
|
||||||
tsdbWarn("s3 grant expired: %d", expired);
|
tsdbWarn("s3 grant expired: %d", expired);
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "cos.h"
|
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
|
#include "tcs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
|
@ -327,7 +327,7 @@ void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
|
||||||
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
|
||||||
char vnode_prefix[TSDB_FILENAME_LEN];
|
char vnode_prefix[TSDB_FILENAME_LEN];
|
||||||
snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
|
snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
|
||||||
s3DeleteObjectsByPrefix(vnode_prefix);
|
tcsDeleteObjectsByPrefix(vnode_prefix);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue