enh: coverage for sma
This commit is contained in:
parent
11bf7511e5
commit
eb301a1d61
|
@ -30,7 +30,6 @@ static int32_t tdRsmaStartExecutor(const SSma *pSma);
|
|||
static int32_t tdRsmaStopExecutor(const SSma *pSma);
|
||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
static void *tdFreeTSmaStat(STSmaStat *pStat);
|
||||
static void tdDestroyRSmaStat(void *pRSmaStat);
|
||||
|
||||
/**
|
||||
|
@ -63,19 +62,15 @@ int32_t smaInit() {
|
|||
|
||||
int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
|
||||
smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
|
||||
if (!smaMgmt.refHash) {
|
||||
taosCloseRef(smaMgmt.rsetId);
|
||||
atomic_store_8(&smaMgmt.inited, 0);
|
||||
smaError("failed to init sma tmr hanle since %s", terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// init fetch timer handle
|
||||
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
|
||||
if (!smaMgmt.tmrHandle) {
|
||||
|
||||
if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
|
||||
taosCloseRef(smaMgmt.rsetId);
|
||||
taosHashCleanup(smaMgmt.refHash);
|
||||
smaMgmt.refHash = NULL;
|
||||
if (smaMgmt.refHash) {
|
||||
taosHashCleanup(smaMgmt.refHash);
|
||||
smaMgmt.refHash = NULL;
|
||||
}
|
||||
atomic_store_8(&smaMgmt.inited, 0);
|
||||
smaError("failed to init sma tmr handle since %s", terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -143,10 +138,6 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
|||
}
|
||||
|
||||
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
||||
if (!ppEnv) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (!(*ppEnv)) {
|
||||
if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -196,10 +187,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_ENV;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (*pSmaStat) { // no lock
|
||||
return code; // success, return directly
|
||||
|
@ -255,16 +242,13 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
|
||||
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
// TODO
|
||||
} else {
|
||||
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
_exit:
|
||||
if (code) {
|
||||
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
|
||||
}
|
||||
smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -277,12 +261,6 @@ static void tdDestroyTSmaStat(STSmaStat *pStat) {
|
|||
}
|
||||
}
|
||||
|
||||
static void *tdFreeTSmaStat(STSmaStat *pStat) {
|
||||
tdDestroyTSmaStat(pStat);
|
||||
taosMemoryFreeClear(pStat);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||
if (pRSmaStat) {
|
||||
SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
|
||||
|
@ -354,10 +332,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
|||
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
|
||||
}
|
||||
} else {
|
||||
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr());
|
||||
return -1;
|
||||
smaError("%s failed at line %d since Unknown type", __func__, __LINE__);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -375,11 +350,6 @@ int32_t tdLockSma(SSma *pSma) {
|
|||
}
|
||||
|
||||
int32_t tdUnLockSma(SSma *pSma) {
|
||||
if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
pSma->locked = false;
|
||||
int code = taosThreadMutexUnlock(&pSma->mutex);
|
||||
|
|
|
@ -26,30 +26,20 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||
|
||||
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
|
||||
smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||
}
|
||||
int32_t code = tdProcessTSmaInsertImpl(pSma, indexUid, msg);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = tdProcessTSmaCreateImpl(pSma, version, msg);
|
||||
|
||||
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
|
||||
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
||||
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
||||
}
|
||||
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
|
||||
int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -63,19 +53,22 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
|
|||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, pCont, contLen);
|
||||
|
||||
STSma tsma = {0};
|
||||
if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
|
||||
terrno = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
goto _err;
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
|
||||
int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND);
|
||||
if (sInterval <= 0) {
|
||||
*days = pTsdbCfg->days;
|
||||
return 0;
|
||||
goto _exit;
|
||||
}
|
||||
int64_t records = pTsdbCfg->days * 60 / sInterval;
|
||||
if (records >= SMA_STORAGE_SPLIT_FACTOR) {
|
||||
|
@ -94,11 +87,14 @@ static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t c
|
|||
*days = pTsdbCfg->days;
|
||||
}
|
||||
}
|
||||
_exit:
|
||||
if (code) {
|
||||
smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
|
||||
} else {
|
||||
smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
|
||||
}
|
||||
tDecoderClear(&coder);
|
||||
return 0;
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,6 +153,8 @@ _exit:
|
|||
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema,
|
||||
SSchemaWrapper *pTagSchemaWrapper, bool createTb, int64_t suid, const char *stbFullName,
|
||||
SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
void *pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
SSubmitReq2 *pReq = NULL;
|
||||
|
@ -166,21 +164,14 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
|
||||
int32_t sz = taosArrayGetSize(pBlocks);
|
||||
|
||||
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
|
||||
goto _end;
|
||||
}
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
createTbArray = taosArrayInit(sz, POINTER_BYTES);
|
||||
pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
|
||||
pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
|
||||
|
||||
if (!(createTbArray = taosArrayInit(sz, POINTER_BYTES))) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
||||
goto _end;
|
||||
if(!tagArray || !createTbArray || !pReq || !pReq->aSubmitTbData) {
|
||||
code = terrno == TSDB_CODE_SUCCESS ? TSDB_CODE_OUT_OF_MEMORY : terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// create table req
|
||||
|
@ -194,8 +185,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
}
|
||||
|
||||
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
};
|
||||
|
||||
// don't move to the end of loop as to destroy in the end of func when error occur
|
||||
|
@ -224,8 +215,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
STag *pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (pTag == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pCreateTbReq->ctb.pTag = (uint8_t *)pTag;
|
||||
|
||||
|
@ -260,7 +251,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
SSubmitTbData tbData = {0};
|
||||
|
||||
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
|
||||
goto _end;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tbData.suid = suid;
|
||||
tbData.uid = 0; // uid is assigned by vnode
|
||||
|
@ -273,7 +265,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
|
||||
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
goto _end;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < rows; ++j) {
|
||||
|
@ -299,9 +292,9 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
}
|
||||
}
|
||||
SRow *pRow = NULL;
|
||||
if ((terrno = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
|
||||
if ((code = tRowBuild(pVals, (STSchema *)pTSchema, &pRow)) < 0) {
|
||||
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||
goto _end;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosArrayPush(tbData.aRowP, &pRow);
|
||||
}
|
||||
|
@ -310,25 +303,27 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
|||
}
|
||||
|
||||
// encode
|
||||
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
|
||||
if (TSDB_CODE_SUCCESS == terrno) {
|
||||
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SEncoder encoder;
|
||||
len += sizeof(SSubmitReq2Msg);
|
||||
pBuf = rpcMallocCont(len);
|
||||
if (NULL == pBuf) {
|
||||
goto _end;
|
||||
if (!(pBuf = rpcMallocCont(len))) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
|
||||
((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
|
||||
((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
|
||||
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
/*vError("failed to encode submit req since %s", terrstr());*/
|
||||
tEncoderClear(&encoder);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
}
|
||||
_end:
|
||||
_exit:
|
||||
taosArrayDestroy(createTbArray);
|
||||
taosArrayDestroy(tagArray);
|
||||
taosArrayDestroy(pVals);
|
||||
|
@ -337,14 +332,15 @@ _end:
|
|||
taosMemoryFree(pReq);
|
||||
}
|
||||
|
||||
if (terrno != 0) {
|
||||
if (code) {
|
||||
rpcFreeCont(pBuf);
|
||||
taosArrayDestroy(pDeleteReq->deleteReqs);
|
||||
return terrno;
|
||||
smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
|
||||
} else {
|
||||
if (ppData) *ppData = pBuf;
|
||||
if (pLen) *pLen = len;
|
||||
}
|
||||
if (ppData) *ppData = pBuf;
|
||||
if (pLen) *pLen = len;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -384,13 +380,13 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
terrno = 0;
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
if (!pTSma) {
|
||||
code = terrno ? code : TSDB_CODE_TSMA_INVALID_PTR;
|
||||
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pTsmaStat->pTSma = pTSma;
|
||||
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1, 1);
|
||||
if (!pTsmaStat->pTSchema) {
|
||||
code = terrno ? code : TSDB_CODE_TSMA_INVALID_PTR;
|
||||
code = terrno ? terrno : TSDB_CODE_TSMA_INVALID_PTR;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue