other: add debug logs
This commit is contained in:
parent
f35baac58a
commit
8033635717
|
@ -523,8 +523,24 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
SCellVal sVal = {0};
|
||||
if (pRowCol->colId == pDataCol->colId) {
|
||||
if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) {
|
||||
ASSERT(0);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pRowCol->colId == 2) {
|
||||
ASSERT(sVal.valType == 0);
|
||||
int32_t val = *(int32_t *)sVal.val;
|
||||
ASSERT(val == 7);
|
||||
} else if (pRowCol->colId == 3) {
|
||||
ASSERT(sVal.valType == 0);
|
||||
int64_t val = *(int64_t *)sVal.val;
|
||||
ASSERT(val == 77777);
|
||||
} else if (pRowCol->colId == 4) {
|
||||
ASSERT(sVal.valType == 0);
|
||||
int16_t val = *(int16_t *)sVal.val;
|
||||
ASSERT(val == 777);
|
||||
}
|
||||
|
||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
|
@ -535,6 +551,7 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sma.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
#define SMA_STORAGE_MINUTES_MAX 86400
|
||||
#define SMA_STORAGE_MINUTES_DAY 1440
|
||||
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
|
||||
|
||||
/**
|
||||
* @brief Judge the tsma file split days
|
||||
*
|
||||
* @param pCfg
|
||||
* @param pCont
|
||||
* @param contLen
|
||||
* @param days unit is minute
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, pCont, contLen);
|
||||
|
||||
STSma tsma = {0};
|
||||
if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
|
||||
terrno = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
goto _err;
|
||||
}
|
||||
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
|
||||
int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND);
|
||||
if (sInterval <= 0) {
|
||||
*days = pTsdbCfg->days;
|
||||
return 0;
|
||||
}
|
||||
int64_t records = pTsdbCfg->days * 60 / sInterval;
|
||||
if (records >= SMA_STORAGE_SPLIT_FACTOR) {
|
||||
*days = pTsdbCfg->days;
|
||||
} else {
|
||||
int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE);
|
||||
int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
|
||||
|
||||
if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
|
||||
*days = SMA_STORAGE_MINUTES_MAX;
|
||||
} else {
|
||||
*days = (int32_t)daysPerFile;
|
||||
}
|
||||
|
||||
if (*days < pTsdbCfg->days) {
|
||||
*days = pTsdbCfg->days;
|
||||
}
|
||||
}
|
||||
tDecoderClear(&coder);
|
||||
return 0;
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief create tsma meta and result stable
|
||||
*
|
||||
* @param pSma
|
||||
* @param version
|
||||
* @param pMsg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
||||
|
||||
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
|
||||
// create tsma meta in dstVgId
|
||||
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create stable to save tsma result in dstVgId
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = pCfg->dstTbName;
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
|
||||
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Insert/Update Time-range-wise SMA data.
|
||||
*
|
||||
* @param pSma
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||
const SArray *pDataBlocks = (const SArray *)msg;
|
||||
// TODO: destroy SSDataBlocks(msg)
|
||||
if (!pDataBlocks) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PTR;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pDataBlocks) <= 0) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PARA;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
|
||||
terrno = TSDB_CODE_TSMA_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||
SSmaStat *pStat = NULL;
|
||||
SSmaStatItem *pItem = NULL;
|
||||
|
||||
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
tdRefSmaStat(pSma, pStat);
|
||||
pItem = &pStat->tsmaStatItem;
|
||||
|
||||
ASSERT(pItem);
|
||||
|
||||
if (!pItem->pTSma) {
|
||||
// cache smaMeta
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
if (!pTSma) {
|
||||
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META;
|
||||
smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pItem->pTSma = pTSma;
|
||||
}
|
||||
|
||||
STSma *pTSma = pItem->pTSma;
|
||||
|
||||
ASSERT(pTSma->indexUid == indexUid);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
|
||||
const char *dbName = "testDb";
|
||||
if (metaGetTableEntryByName(&mr, dbName) != 0) {
|
||||
smaDebug("vgId:%d, tsma no table testTb exists for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno));
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = dbName;
|
||||
pReq.suid = pTSma->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = NULL;
|
||||
buildSubmitReqFromDataBlock(&pSubmitReq, (const SArray *)msg, NULL, pItem->pTSma->dstVgId,
|
||||
pItem->pTSma->dstTbUid);
|
||||
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
|
@ -15,6 +15,8 @@
|
|||
|
||||
#include "vnd.h"
|
||||
|
||||
int32_t gForceCommit = 1;
|
||||
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
@ -182,7 +184,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
|
||||
// commit if need
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
if ((gForceCommit == 1) || vnodeShouldCommit(pVnode)) {
|
||||
gForceCommit = 0;
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
|
||||
// commit current change
|
||||
vnodeCommit(pVnode);
|
||||
|
@ -719,6 +722,7 @@ static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const
|
|||
}
|
||||
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
gForceCommit = 1;
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
SSubmitRsp submitRsp = {0};
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
|
|
Loading…
Reference in New Issue