tsdb/write: make last tier writable to mem

This commit is contained in:
Minglei Jin 2023-10-27 15:51:52 +08:00
parent 6874fa2a3d
commit 8c6f452f09
2 changed files with 11 additions and 11 deletions

View File

@ -80,8 +80,8 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData); int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
/*
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
if (nlevel > 1 && tsS3Enabled) { if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) { if (nlevel == 3) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
@ -89,7 +89,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0; minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
} }
} }
*/
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i); SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {

View File

@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "audit.h"
#include "tencode.h" #include "tencode.h"
#include "tmsg.h" #include "tmsg.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h" #include "vndCos.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "audit.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
@ -262,8 +262,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
now *= 1000000; now *= 1000000;
} }
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
int32_t keep = pVnode->config.tsdbCfg.keep2; int32_t keep = pVnode->config.tsdbCfg.keep2;
/*
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
if (nlevel > 1 && tsS3Enabled) { if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) { if (nlevel == 3) {
keep = pVnode->config.tsdbCfg.keep1; keep = pVnode->config.tsdbCfg.keep1;
@ -271,7 +272,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
keep = pVnode->config.tsdbCfg.keep0; keep = pVnode->config.tsdbCfg.keep0;
} }
} }
*/
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep; TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
@ -584,7 +585,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} }
} break; } break;
case TDMT_VND_STREAM_TASK_RESET: { case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored/* && vnodeIsLeader(pVnode)*/) { if (pVnode->restored /* && vnodeIsLeader(pVnode)*/) {
tqProcessTaskResetReq(pVnode->pTq, pMsg); tqProcessTaskResetReq(pVnode->pTq, pMsg);
} }
} break; } break;
@ -947,7 +948,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
taosArrayPush(rsp.pArray, &cRsp); taosArrayPush(rsp.pArray, &cRsp);
if(tsEnableAuditCreateTable){ if (tsEnableAuditCreateTable) {
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0}; SName name = {0};
@ -1460,7 +1461,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
for (int32_t iRow = 0; iRow < nRow; ++iRow) { for (int32_t iRow = 0; iRow < nRow; ++iRow) {
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) { if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver); vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);