diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2a29d9b7c7..fd9ff3eb65 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1776,12 +1776,10 @@ typedef struct { } SDDropTopicReq; typedef struct { - float xFilesFactor; - int32_t delay; - int32_t qmsg1Len; - int32_t qmsg2Len; - char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1 - char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 + int64_t maxdelay[2]; + int64_t watermark[2]; + int32_t qmsgLen[2]; + char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2 } SRSmaParam; int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 92eda0c5e0..269c92a670 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -185,7 +185,7 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR bool tsStartUdfd = true; // internal -int32_t tsTransPullupInterval = 6; +int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c3c96972b7..d078d22cdf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4273,39 +4273,34 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { } int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { - if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1; - if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1; - if (pRSmaParam->qmsg1Len > 0) { - if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0' - return -1; - } - if (pRSmaParam->qmsg2Len > 0) { - if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0' - return -1; + for (int32_t i = 0; i < 2; ++i) { + if (tEncodeI64v(pCoder, pRSmaParam->maxdelay[i]) < 0) return -1; + if (tEncodeI64v(pCoder, pRSmaParam->watermark[i]) < 0) return -1; + if (tEncodeI32v(pCoder, pRSmaParam->qmsgLen[i]) < 0) return -1; + if (pRSmaParam->qmsgLen[i] > 0) { + if (tEncodeBinary(pCoder, pRSmaParam->qmsg[i], (uint64_t)pRSmaParam->qmsgLen[i]) < + 0) // qmsgLen contains len of '\0' + return -1; + } } return 0; } int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) { - if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1; - if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1; - if (pRSmaParam->qmsg1Len > 0) { - uint64_t len; - if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0' - } else { - pRSmaParam->qmsg1 = NULL; - } - if (pRSmaParam->qmsg2Len > 0) { - uint64_t len; - if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0' - } else { - pRSmaParam->qmsg2 = NULL; + for (int32_t i = 0; i < 2; ++i) { + if (tDecodeI64v(pCoder, &pRSmaParam->maxdelay[i]) < 0) return -1; + if (tDecodeI64v(pCoder, &pRSmaParam->watermark[i]) < 0) return -1; + if (tDecodeI32v(pCoder, &pRSmaParam->qmsgLen[i]) < 0) return -1; + if (pRSmaParam->qmsgLen[i] > 0) { + uint64_t len; + if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg[i], &len) < 0) + return -1; // qmsgLen contains len of '\0' + } else { + pRSmaParam->qmsg[i] = NULL; + } } + return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8963f6be39..987b01b96a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -341,8 +341,8 @@ typedef struct { int32_t colVer; int32_t smaVer; int32_t nextColId; - float xFilesFactor; - int32_t delay; + int64_t watermark[2]; + int64_t maxdelay[2]; int32_t ttl; int32_t numOfColumns; int32_t numOfTags; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 8e816d2dd6..15d2c6cd5e 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, - int64_t watermark, double filesFactor); + int64_t watermark); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3bd9a9128d..645634d7f3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -43,7 +43,7 @@ static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { } int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, - int64_t watermark, double filesFactor) { + int64_t watermark) { SNode* pAst = NULL; SQueryPlan* pPlan = NULL; terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 96dc79adbc..3e50ea8262 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -89,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER) - SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER) - SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[0], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->maxdelay[1], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->watermark[0], _OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->watermark[1], _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, _OVER) @@ -168,10 +170,10 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER) - int32_t xFilesFactor = 0; - SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER) - pStb->xFilesFactor = xFilesFactor / 10000.0f; - SDB_GET_INT32(pRaw, dataPos, &pStb->delay, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[0], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->maxdelay[1], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[0], _OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->watermark[1], _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, _OVER) @@ -399,18 +401,18 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { - req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; - req.pRSmaParam.delay = pStb->delay; + req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0]; + req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1]; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, - STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { - return NULL; + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) { + goto _err; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, - STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { - return NULL; + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) { + goto _err; } } } @@ -418,17 +420,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt int32_t ret = 0; tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret); if (ret < 0) { - return NULL; + goto _err; } contLen += sizeof(SMsgHead); SMsgHead *pHead = taosMemoryMalloc(contLen); if (pHead == NULL) { - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + goto _err; } pHead->contLen = htonl(contLen); @@ -438,17 +438,19 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead)); if (tEncodeSVCreateStbReq(&encoder, &req) < 0) { taosMemoryFreeClear(pHead); - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); tEncoderClear(&encoder); - return NULL; + goto _err; } tEncoderClear(&encoder); *pContLen = contLen; - taosMemoryFreeClear(req.pRSmaParam.qmsg1); - taosMemoryFreeClear(req.pRSmaParam.qmsg2); + taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); + taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); return pHead; +_err: + taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); + taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); + return NULL; } static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { @@ -670,8 +672,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->tagVer = 1; pDst->colVer = 1; pDst->nextColId = 1; - // pDst->xFilesFactor = pCreate->xFilesFactor; - // pDst->delay = pCreate->delay; + pDst->maxdelay[0] = pCreate->delay1; + pDst->maxdelay[1] = pCreate->delay2; + pDst->watermark[0] = pCreate->watermark1; + pDst->watermark[1] = pCreate->watermark2; pDst->ttl = pCreate->ttl; pDst->numOfColumns = pCreate->numOfColumns; pDst->numOfTags = pCreate->numOfTags; @@ -897,7 +901,7 @@ static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, cha return -1; } memcpy(pNew->comment, pComment, commentLen + 1); - } else if(commentLen == 0){ + } else if (commentLen == 0) { pNew->commentLen = 0; } @@ -1849,7 +1853,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, pStb->comment); colDataAppend(pColInfo, numOfRows, comment, false); - } else if(pStb->commentLen == 0) { + } else if (pStb->commentLen == 0) { char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, ""); colDataAppend(pColInfo, numOfRows, comment, false); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3a023bcece..07f65b2a90 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -68,8 +68,9 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM mndTransExecute(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); } - +#if 0 sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); +#endif } } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 31a955b030..30e46af03c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -804,7 +804,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { sendRsp = true; } } else { - if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 2) { + if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 3) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } @@ -1127,6 +1127,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } if (code == 0) { + pTrans->failedTimes = 0; pTrans->lastAction = action; pTrans->lastMsgType = 0; pTrans->lastErrorNo = 0; @@ -1430,8 +1431,7 @@ void mndTransPullup(SMnode *pMnode) { mndReleaseTrans(pMnode, pTrans); } - // todo, set to SDB_WRITE_DELTA - sdbWriteFile(pMnode->pSdb, 0); + sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); taosArrayDestroy(pArray); } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 1bd09aef63..3b1c4000a8 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -37,7 +37,7 @@ extern "C" { #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} // clang-format on -#define SDB_WRITE_DELTA 100 +#define SDB_WRITE_DELTA 20 #define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ { \ diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index c44f1670c3..fbf66da632 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -161,9 +161,11 @@ static int32_t sdbCreateDir(SSdb *pSdb) { } void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config) { - mTrace("mnode apply info changed, from index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", to index:%" PRId64 +#if 1 + mTrace("mnode apply info changed from index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " to index:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, index, term, config); +#endif pSdb->applyIndex = index; pSdb->applyTerm = term; pSdb->applyConfig = config; @@ -173,7 +175,9 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config *index = pSdb->commitIndex; *term = pSdb->commitTerm; *config = pSdb->commitConfig; +#if 0 mTrace("mnode current info, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config); +#endif } diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 1e77022d04..902e5e1bcc 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,11 +32,12 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct SSmaEnv SSmaEnv; -typedef struct SSmaStat SSmaStat; -typedef struct SSmaStatItem SSmaStatItem; -typedef struct SSmaKey SSmaKey; -typedef struct SRSmaInfo SRSmaInfo; +typedef struct SSmaEnv SSmaEnv; +typedef struct SSmaStat SSmaStat; +typedef struct SSmaStatItem SSmaStatItem; +typedef struct SSmaKey SSmaKey; +typedef struct SRSmaInfo SRSmaInfo; +typedef struct SRSmaInfoItem SRSmaInfoItem; struct SSmaEnv { TdThreadRwlock lock; diff --git a/source/dnode/vnode/src/sma/sma.c b/source/dnode/vnode/src/sma/sma.c index b5c55a2f83..12f93f9400 100644 --- a/source/dnode/vnode/src/sma/sma.c +++ b/source/dnode/vnode/src/sma/sma.c @@ -15,6 +15,8 @@ #include "sma.h" +// functions for external invocation + // TODO: Who is responsible for resource allocate and release? int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) { int32_t code = TSDB_CODE_SUCCESS; @@ -45,6 +47,9 @@ int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* return code; } + +// functions for internal invocation + #if 0 /** diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index f71c222772..a80af2b202 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -208,7 +208,6 @@ int32_t tdUnLockSma(SSma *pSma) { int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { SSmaEnv *pEnv = NULL; - // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) { @@ -244,3 +243,34 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { return TSDB_CODE_SUCCESS; }; + +int32_t smaTimerInit(void **timer, int8_t *initFlag, const char *label) { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(initFlag, 0, 2); + if (old != 2) break; + } + + if (old == 0) { + *timer = taosTmrInit(10000, 100, 10000, label); + if (!(*timer)) { + atomic_store_8(initFlag, 0); + return -1; + } + atomic_store_8(initFlag, 1); + } + return 0; +} + +void smaTimerCleanUp(void *timer, int8_t *initFlag) { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(initFlag, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + taosTmrCleanUp(timer); + atomic_store_8(initFlag, 0); + } +} diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 0c372dfa70..1f18f6cb87 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -14,14 +14,61 @@ */ #include "sma.h" +#include "tstream.h" static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); -static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, int8_t level); +static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, + tb_uid_t suid, int8_t level); + +#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \ + if (param->qmsg[__idx]) { \ + pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \ + pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \ + if (!pRSmaInfo->items[__idx].taskInfo) { \ + goto _err; \ + } \ + pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \ + if (param->maxdelay[__idx] < 1) { \ + int64_t msInterval = \ + convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \ + pRSmaInfo->items[__idx].maxDelay = msInterval; \ + } else { \ + pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \ + } \ + if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \ + pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \ + } \ + pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \ + pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \ + if (!pRSmaInfo->items[__idx].tmrHandle) { \ + goto _err; \ + } \ + } + +struct SRSmaInfoItem { + SRSmaInfo *pRsmaInfo; + void *taskInfo; // qTaskInfo_t + void *tmrHandle; + tmr_h tmrId; + int8_t level; + int8_t tmrInitFlag; + int8_t triggerStatus; // TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE + int32_t maxDelay; +}; + +typedef struct { + int64_t suid; + SRSmaInfoItem *pItem; + SSma *pSma; + STSchema *pTSchema; +} SRSmaTriggerParam; struct SRSmaInfo { - void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t + STSchema *pTSchema; + SSma *pSma; + int64_t suid; + SRSmaInfoItem items[TSDB_RETENTION_L2]; }; static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { @@ -33,11 +80,20 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) { } void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { - for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { - if (pInfo->taskInfo[i]) { - tdFreeTaskHandle(pInfo->taskInfo[i]); + if (pInfo) { + for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { + SRSmaInfoItem *pItem = &pInfo->items[i]; + if (pItem->taskInfo) { + tdFreeTaskHandle(pItem->taskInfo); + } + if (pItem->tmrHandle) { + taosTmrCleanUp(pItem->tmrHandle); + } } + taosMemoryFree(pInfo->pTSchema); + taosMemoryFree(pInfo); } + return NULL; } @@ -69,20 +125,20 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA return TSDB_CODE_FAILED; } - if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) { + if (pRSmaInfo->items[0].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) { smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); } - if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) { + if (pRSmaInfo->items[1].taskInfo && (qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) { smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); } return TSDB_CODE_SUCCESS; @@ -144,12 +200,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui ASSERT(ppStore != NULL); if (!(*ppStore)) { - if (tdUidStoreInit(ppStore) != 0) { + if (tdUidStoreInit(ppStore) < 0) { return TSDB_CODE_FAILED; } } - if (tdUidStorePut(*ppStore, suid, &uid) != 0) { + if (tdUidStorePut(*ppStore, suid, &uid) < 0) { *ppStore = tdUidStoreFree(*ppStore); return TSDB_CODE_FAILED; } @@ -172,11 +228,11 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { return TSDB_CODE_SUCCESS; } - SMeta *pMeta = pVnode->pMeta; - SMsgCb *pMsgCb = &pVnode->msgCb; + SMeta *pMeta = pVnode->pMeta; + SMsgCb *pMsgCb = &pVnode->msgCb; SRSmaParam *param = &pReq->pRSmaParam; - if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { + if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } @@ -192,10 +248,12 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); if (pRSmaInfo) { + ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; } + // from write queue: single thead pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); if (!pRSmaInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -204,9 +262,8 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); if (!pReadHandle) { - taosMemoryFree(pRSmaInfo); terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + goto _err; } SReadHandle handle = { @@ -216,32 +273,33 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { .vnode = pVnode, }; - if (param->qmsg1) { - pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle); - if (!pRSmaInfo->taskInfo[0]) { - taosMemoryFree(pRSmaInfo); - taosMemoryFree(pReadHandle); - return TSDB_CODE_FAILED; - } + STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), pReq->suid, -1); + if (!pTSchema) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _err; } + pRSmaInfo->pTSchema = pTSchema; + pRSmaInfo->pSma = pSma; + pRSmaInfo->suid = pReq->suid; - if (param->qmsg2) { - pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle); - if (!pRSmaInfo->taskInfo[1]) { - taosMemoryFree(pRSmaInfo); - taosMemoryFree(pReadHandle); - return TSDB_CODE_FAILED; - } - } + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + + SET_RSMA_INFO_ITEM_PARAMS(0, 1); + SET_RSMA_INFO_ITEM_PARAMS(1, 2); if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; + goto _err; } else { smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); } return TSDB_CODE_SUCCESS; +_err: + tdFreeRSmaInfo(pRSmaInfo); + taosMemoryFree(pReadHandle); + return TSDB_CODE_FAILED; } /** @@ -291,12 +349,12 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) < 0) { return TSDB_CODE_FAILED; } } } else { - if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) < 0) { return TSDB_CODE_FAILED; } } @@ -367,22 +425,15 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { return 0; } -static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, - STSchema *pTSchema, tb_uid_t suid, int8_t level) { - SArray *pResult = NULL; +static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { + SArray *pResult = NULL; + SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; + SSma *pSma = pRSmaInfo->pSma; - if (!taskInfo) { - smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); - return TSDB_CODE_SUCCESS; - } - - smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid); - - qSetStreamInput(taskInfo, pMsg, inputType, true); while (1) { SSDataBlock *output = NULL; uint64_t ts; - if (qExecTask(taskInfo, &output, &ts) < 0) { + if (qExecTask(pItem->taskInfo, &output, &ts) < 0) { ASSERT(false); } if (!output) { @@ -402,16 +453,16 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 if (taosArrayGetSize(pResult) > 0) { #if 0 char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, level); + snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowData(pResult, flag); #endif - STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pRSmaInfo->pTSchema, SMA_VID(pSma), pRSmaInfo->suid) < 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } - + if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); @@ -420,10 +471,63 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 taosMemoryFreeClear(pReq); } else { - smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); + smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); + } + + if (blkType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); } taosArrayDestroy(pResult); + return 0; +} + +/** + * @brief trigger to get rsma result + * + * @param param + * @param tmrId + */ +static void rsmaTriggerByTimer(void *param, void *tmrId) { + // SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param; + // SRSmaInfoItem *pItem = pParam->pItem; + SRSmaInfoItem *pItem = param; + + if (atomic_load_8(&pItem->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { + smaTrace("level %" PRIi8 " status is active for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); + qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + + tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); + } else { + smaTrace("level %" PRIi8 " status is inactive for tb suid:%" PRIi64, pItem->level, pItem->pRsmaInfo->suid); + } + + // taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); +} + +static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, + tb_uid_t suid, int8_t level) { + if (!pItem || !pItem->taskInfo) { + smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + return TSDB_CODE_SUCCESS; + } + + smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, + pItem->taskInfo, suid); + + // inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1) + if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { + smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + + // SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema}; + tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); + atomic_store_8(&pItem->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); + taosTmrReset(rsmaTriggerByTimer, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId); return TSDB_CODE_SUCCESS; } @@ -441,24 +545,18 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - smaDebug("vgId:%d, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); + smaDebug("vgId:%d, return as no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } - if (!pRSmaInfo->taskInfo[0]) { - smaDebug("vgId:%d, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); + + if (!pRSmaInfo->items[0].taskInfo) { + smaDebug("vgId:%d, return as no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - // TODO: cache STSchema - STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); - if (!pTSchema) { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - return TSDB_CODE_FAILED; - } - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, TSDB_RETENTION_L2); - taosMemoryFree(pTSchema); + tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1); + tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c097e2f929..f3595ebfb0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,7 +106,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp int32_t len; int32_t ret; - vError("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), + vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); pVnode->state.applied = version; @@ -345,7 +345,10 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p goto _err; } - tdProcessRSmaCreate(pVnode, &req); + if (tdProcessRSmaCreate(pVnode, &req) < 0) { + pRsp->code = terrno; + goto _err; + } tDecoderClear(&coder); return 0; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 07686893db..1117be5db0 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -114,7 +114,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); -void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); +void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b7c7102143..802f9ea5b5 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -75,12 +75,18 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { - int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); - + int32_t numOfCols = 0; + SNode* pNode; + FOREACH(pNode, pHandle->pSchema->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + ++numOfCols; + } + } SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols); pEntry->numOfRows = pInput->pData->info.rows; - pEntry->numOfCols = pInput->pData->info.numOfCols; + pEntry->numOfCols = numOfCols; pEntry->dataLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 99139be409..aadab4f22a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -193,9 +193,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); - // if (!pDescNode->output) { // todo disable it temporarily - // continue; - // } +// if (!pDescNode->output) { // todo disable it temporarily +// continue; +// } idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; @@ -235,7 +235,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo terrno = code; return code; } else { - qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid); + qDebug("success to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid); } for (int i = 0; i < taosArrayGetSize(res); i++) { @@ -319,7 +319,14 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod continue; } - SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); + SColMatchInfo* info = NULL; + for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { + info = taosArrayGet(pList, j); + if (info->targetSlotId == pNode->slotId) { + break; + } + } + if (pNode->output) { (*numOfOutputCols) += 1; } else { @@ -578,14 +585,15 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } // NOTE: sources columns are more than the destination SSDatablock columns. -void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) { +// doFilter in table scan needs every column even its output is false +void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) { size_t numOfSrcCols = taosArrayGetSize(pCols); int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { SColumnInfoData* p = taosArrayGet(pCols, i); SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j); - if (!pmInfo->output) { + if (!outputEveryColumn && !pmInfo->output) { j++; continue; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c99b3c1058..cf72dfd796 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -41,12 +41,18 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->assignBlockUid = assignUid; // the block type can not be changed in the streamscan operators +#if 0 if (pInfo->blockType == 0) { pInfo->blockType = type; } else if (pInfo->blockType != type) { ASSERT(0); return TSDB_CODE_QRY_APP_ERROR; } +#endif + // rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock + if (pInfo->blockType != type) { + pInfo->blockType = type; + } if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3828a26bc4..81ca0f4f8c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2086,7 +2086,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo // data from mnode pRes->info.rows = numOfRows; - relocateColumnData(pRes, pColList, pBlock->pDataBlock); + relocateColumnData(pRes, pColList, pBlock->pDataBlock, false); taosArrayDestroy(pBlock->pDataBlock); taosMemoryFree(pBlock); // blockDataDestroy(pBlock); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c47e9dd4ae..e4f940d991 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -259,7 +259,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { @@ -1505,7 +1505,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); doFilterResult(pInfo); blockDataDestroy(p); @@ -1597,7 +1597,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); pInfo->pRes->info.rows = p->info.rows; blockDataDestroy(p); @@ -2079,7 +2079,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 01fcc61cf0..076ae61460 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2562,9 +2562,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { - if (!pInput->hasResult) { - return; - } pOutput->bytes = pInput->bytes; TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes); TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); @@ -2598,7 +2595,9 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery); - SET_VAL(GET_RES_INFO(pCtx), 1, 1); + int32_t numOfElems = pInputInfo->hasResult ? 1 : 0; + + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -2623,6 +2622,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 472d672607..f6ae027e48 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -972,6 +972,11 @@ void releaseUdfFuncHandle(char* udfName) { } int32_t cleanUpUdfs() { + int8_t initialized = atomic_load_8(&gUdfdProxy.initialized); + if (!initialized) { + return TSDB_CODE_SUCCESS; + } + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); int32_t i = 0; SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index a7c25162b7..5ed49f6aae 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1082,13 +1082,89 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub return code; } +static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) { + //TODO: enable this optimization after new mechanising that map projection and targets of project node + if (NULL != pNode->pParent) { + return false; + } + + if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + + SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode; + if (-1 != pProjectNode->limit || -1 != pProjectNode->slimit || -1 != pProjectNode->offset || -1 != pProjectNode->soffset) { + return false; + } + + SHashObj* pProjColNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + SNode* pProjection; + FOREACH(pProjection, pProjectNode->pProjections) { + SExprNode* pExprNode = (SExprNode*)pProjection; + if (QUERY_NODE_COLUMN != nodeType(pExprNode)) { + taosHashCleanup(pProjColNameHash); + return false; + } + + char* projColumnName = ((SColumnNode*)pProjection)->colName; + int32_t* pExist = taosHashGet(pProjColNameHash, projColumnName, strlen(projColumnName)); + if (NULL != pExist) { + taosHashCleanup(pProjColNameHash); + return false; + } else { + int32_t exist = 1; + taosHashPut(pProjColNameHash, projColumnName, strlen(projColumnName), &exist, sizeof(exist)); + } + } + taosHashCleanup(pProjColNameHash); + + return true; +} + +static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SProjectLogicNode* pProjectNode) { + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0); + SNodeList* pNewChildTargets = nodesMakeList(); + + SNode* pProjection = NULL; + FOREACH(pProjection, pProjectNode->pProjections) { + SNode* pChildTarget = NULL; + FOREACH(pChildTarget, pChild->pTargets) { + if (strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName) == 0) { + nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); + break; + } + } + } + nodesDestroyList(pChild->pTargets); + pChild->pTargets = pNewChildTargets; + + int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); + if (TSDB_CODE_SUCCESS == code) { + NODES_CLEAR_LIST(pProjectNode->node.pChildren); + nodesDestroyNode((SNode*)pProjectNode); + } + return code; +} + +static int32_t eliminateProjOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SProjectLogicNode* pProjectNode = + (SProjectLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, eliminateProjOptMayBeOptimized); + + if (NULL == pProjectNode) { + return TSDB_CODE_SUCCESS; + } + + return eliminateProjOptimizeImpl(pCxt, pLogicSubplan, pProjectNode); +} + // clang-format off static const SOptimizeRule optimizeRuleSet[] = { {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaOptimize}, - {.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize} + {.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}, + {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize} }; // clang-format on diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 7f650c7c9a..77e4e05530 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -107,6 +107,7 @@ int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList) { int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { if (NULL == pOld->pParent) { pSubplan->pNode = (SLogicNode*)pNew; + pNew->pParent = NULL; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 84ccea668d..6b514ef2c3 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -52,3 +52,13 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) { run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTARTTS DESC"); } + +TEST_F(PlanOptimizeTest, eliminateProjection) { + useDb("root", "test"); + + run("SELECT c1, sum(c3) FROM t1 GROUP BY c1"); + run("SELECT c1 FROM t1"); + run("SELECT * FROM st1"); + run("SELECT c1 FROM st1s3"); + //run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first"); +} diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0f95da42e2..28181d7e11 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -533,7 +533,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex atomic_store_8(&ctx->taskType, taskType); atomic_store_8(&ctx->explain, explain); - /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ + QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 9368706378..45e71f6c0d 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -17,6 +17,7 @@ #define TDB_BTREE_ROOT 0x1 #define TDB_BTREE_LEAF 0x2 +#define TDB_BTREE_OVFL 0x4 struct SBTree { SPgno root; @@ -38,9 +39,11 @@ struct SBTree { #define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags)) #define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT) #define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF) +#define TDB_BTREE_PAGE_IS_OVFL(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL) #define TDB_BTREE_ASSERT_FLAG(flags) \ ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \ - TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0)) + TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \ + TDB_FLAG_IS(flags, TDB_BTREE_OVFL)) #pragma pack(push, 1) typedef struct { @@ -62,10 +65,10 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2 static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeInitPage(SPage *pPage, void *arg, int init); static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, - int *szCell); -static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder); + int *szCell, TXN *pTxn, SBTree *pBt); +static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt); static int tdbBtreeBalance(SBTC *pBtc); -static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell); +static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt); static int tdbBtcMoveDownward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc); @@ -255,7 +258,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL } pCell = tdbPageGetCell(btc.pPage, btc.idx); - tdbBtreeDecodeCell(btc.pPage, pCell, &cd); + tdbBtreeDecodeCell(btc.pPage, pCell, &cd, btc.pTxn, pBt); if (ppKey) { pTKey = tdbRealloc(*ppKey, cd.kLen); @@ -281,6 +284,14 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL memcpy(*ppVal, cd.pVal, cd.vLen); } + if (TDB_CELLDECODER_FREE_KEY(&cd)) { + tdbFree(cd.pKey); + } + + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbFree(cd.pVal); + } + tdbBtcClose(&btc); return 0; @@ -375,6 +386,11 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg, int init) { pPage->vLen = pBt->valLen; pPage->maxLocal = pBt->maxLeaf; pPage->minLocal = pBt->minLeaf; + } else if (TDB_BTREE_PAGE_IS_OVFL(pPage)) { + pPage->kLen = pBt->keyLen; + pPage->vLen = pBt->valLen; + pPage->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)); + pPage->minLocal = pBt->minLocal; } else { pPage->kLen = pBt->keyLen; pPage->vLen = sizeof(SPgno); @@ -499,7 +515,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx for (int i = 0; i < nOlds; i++) { if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) { pCell = tdbPageGetCell(pParent, sIdx + i); - szDivCell[i] = tdbBtreeCellSize(pParent, pCell); + szDivCell[i] = tdbBtreeCellSize(pParent, pCell, 0, NULL, NULL); pDivCell[i] = tdbOsMalloc(szDivCell[i]); memcpy(pDivCell[i], pCell, szDivCell[i]); } @@ -524,7 +540,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx for (int i = 0; i < nOlds; i++) { nCells = TDB_PAGE_TOTAL_CELLS(pParent); if (sIdx < nCells) { - tdbPageDropCell(pParent, sIdx); + tdbPageDropCell(pParent, sIdx, pTxn, pBt); } else { ((SIntHdr *)pParent->pData)->pgno = 0; } @@ -582,7 +598,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx for (;;) { pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx); - szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell); + szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell, 0, NULL, NULL); if (!childNotLeaf) { szRCell = szLCell; } else { @@ -600,7 +616,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx } pCell = tdbPageGetCell(pPage, oIdx); - szRCell = tdbBtreeCellSize(pPage, pCell); + szRCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL); } ASSERT(infoNews[iNew - 1].cnt > 0); @@ -687,7 +703,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) { pCell = tdbPageGetCell(pPage, oIdx); - szCell = tdbBtreeCellSize(pPage, pCell); + szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL); ASSERT(nNewCells <= infoNews[iNew].cnt); ASSERT(iNew < nNews); @@ -703,14 +719,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx if (iNew == nNews - 1 && pIntHdr->pgno == 0) { pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]); } else { - tdbBtreeDecodeCell(pPage, pCell, &cd); + tdbBtreeDecodeCell(pPage, pCell, &cd, pTxn, pBt); // TODO: pCell here may be inserted as an overflow cell, handle it SCell *pNewCell = tdbOsMalloc(cd.kLen + 9); int szNewCell; SPgno pgno; pgno = TDB_PAGE_PGNO(pNews[iNew]); - tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell); + tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn, pBt); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbOsFree(pNewCell); } @@ -846,13 +862,50 @@ static int tdbBtreeBalance(SBTC *pBtc) { } // TDB_BTREE_BALANCE +static int tdbFetchOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) { + int ret = 0; + + *pPgno = 0; + SBtreeInitPageArg iArg; + iArg.pBt = pBt; + iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL); + ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn); + if (ret < 0) { + return -1; + } + + // mark dirty + ret = tdbPagerWrite(pPager, *ppOfp); + if (ret < 0) { + ASSERT(0); + return -1; + } + + return ret; +} + +static int tdbLoadOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) { + int ret = 0; + + SBtreeInitPageArg iArg; + iArg.pBt = pBt; + iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL); + ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn); + if (ret < 0) { + return -1; + } + + return ret; +} + // TDB_BTREE_CELL ===================== static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal, - int vLen, int *szPayload) { - int nPayload; + int vLen, int *szPayload, TXN *pTxn, SBTree *pBt) { + int ret = 0; + int nPayload = kLen + vLen; + int maxLocal = pPage->maxLocal; - nPayload = kLen + vLen; - if (nPayload + nHeader <= pPage->maxLocal) { + if (nPayload + nHeader <= maxLocal) { // no overflow page is needed memcpy(pCell + nHeader, pKey, kLen); if (pVal) { @@ -861,18 +914,190 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const *szPayload = nPayload; return 0; - } + } else { + // handle overflow case + // calc local storage size + int minLocal = pPage->minLocal; + int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno)); + int nLocal = surplus <= maxLocal ? surplus : minLocal; - { - // TODO: handle overflow case - ASSERT(0); + //int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)); + + // fetch a new ofp and make it dirty + SPgno pgno = 0; + SPage *ofp, *nextOfp; + + ret = tdbFetchOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); + if (ret < 0) { + return -1; + } + + // local buffer for cell + SCell *pBuf = tdbRealloc(NULL, pBt->pageSize); + if (pBuf == NULL) { + return -1; + } + + int nLeft = nPayload; + int bytes; + int lastPage = 0; + if (nLocal >= kLen + 4) { + // pack key to local + memcpy(pCell + nHeader, pKey, kLen); + nLeft -= kLen; + // pack partial val to local if any space left + if (nLocal > kLen + 4) { + memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno)); + nLeft -= nLocal - kLen - sizeof(SPgno); + } + + // pack nextPgno + memcpy(pCell + nHeader + nPayload - nLeft, &pgno, sizeof(pgno)); + + // pack left val data to ovpages + do { + lastPage = 0; + if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { + bytes = nLeft; + lastPage = 1; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + // fetch next ofp if not last page + if (!lastPage) { + // fetch a new ofp and make it dirty + ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); + if (ret < 0) { + tdbFree(pBuf); + return -1; + } + } else { + pgno = 0; + } + + memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); + memcpy(pBuf + bytes, &pgno, sizeof(pgno)); + + ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); + if (ret < 0) { + tdbFree(pBuf); + return -1; + } + + ofp = nextOfp; + nLeft -= bytes; + } while (nLeft > 0); + } else { + int nLeftKey = kLen; + // pack partial key and nextPgno + memcpy(pCell + nHeader, pKey, nLocal - 4); + nLeft -= nLocal - 4; + nLeftKey -= nLocal -4; + + memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno)); + + int lastKeyPageSpace = 0; + // pack left key & val to ovpages + do { + // cal key to cpy + int lastKeyPage = 0; + if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) { + bytes = nLeftKey; + lastKeyPage = 1; + lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + // cpy key + memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes); + + if (lastKeyPage) { + if (lastKeyPageSpace >= vLen) { + memcpy(pBuf + kLen -nLeftKey, pVal, vLen); + + nLeft -= vLen; + pgno = 0; + } else { + memcpy(pBuf + kLen -nLeftKey, pVal, lastKeyPageSpace); + nLeft -= lastKeyPageSpace; + + // fetch next ofp, a new ofp and make it dirty + ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); + if (ret < 0) { + return -1; + } + } + } else { + // fetch next ofp, a new ofp and make it dirty + ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); + if (ret < 0) { + return -1; + } + } + + memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno)); + + ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); + if (ret < 0) { + return -1; + } + + ofp = nextOfp; + nLeftKey -= bytes; + nLeft -= bytes; + } while (nLeftKey > 0); + + while (nLeft > 0) { + // pack left val data to ovpages + lastPage = 0; + if (nLeft <= maxLocal - sizeof(SPgno)) { + bytes = nLeft; + lastPage = 1; + } else { + bytes = maxLocal - sizeof(SPgno); + } + + // fetch next ofp if not last page + if (!lastPage) { + // fetch a new ofp and make it dirty + ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt); + if (ret < 0) { + tdbFree(pBuf); + return -1; + } + } else { + pgno = 0; + } + + memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); + memcpy(pBuf + bytes, &pgno, sizeof(pgno)); + + ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); + if (ret < 0) { + tdbFree(pBuf); + return -1; + } + + ofp = nextOfp; + nLeft -= bytes; + } + } + + // free local buffer + tdbFree(pBuf); + + *szPayload = nLocal; + + // ASSERT(0); } return 0; } static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, - int *szCell) { + int *szCell, TXN *pTxn, SBTree *pBt) { u8 leaf; int nHeader; int nPayload; @@ -911,7 +1136,7 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo vLen = 0; } - ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload); + ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload, pTxn, pBt); if (ret < 0) { // TODO ASSERT(0); @@ -922,8 +1147,13 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo return 0; } -static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder) { +static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) { + int ret = 0; int nPayload; + int maxLocal = pPage->maxLocal; + + int kLen = pDecoder->kLen; + int vLen = pDecoder->vLen; if (pDecoder->pVal) { ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage)); @@ -932,24 +1162,171 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, nPayload = pDecoder->kLen + pDecoder->vLen; } - if (nHeader + nPayload <= pPage->maxLocal) { + if (nHeader + nPayload <= maxLocal) { // no over flow case - pDecoder->pKey = pCell + nHeader; + pDecoder->pKey = (SCell *)pCell + nHeader; if (pDecoder->pVal == NULL && pDecoder->vLen > 0) { - pDecoder->pVal = pCell + nHeader + pDecoder->kLen; + pDecoder->pVal = (SCell *)pCell + nHeader + pDecoder->kLen; } return 0; - } + } else { + // handle overflow case + // calc local storage size + int minLocal = pPage->minLocal; + int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno)); + int nLocal = surplus <= maxLocal ? surplus : minLocal; - { - // TODO: handle overflow case - ASSERT(0); + int nLeft = nPayload; + SPgno pgno = 0; + SPage *ofp; + SCell *ofpCell; + int bytes; + int lastPage = 0; + + if (nLocal >= pDecoder->kLen + 4) { + pDecoder->pKey = (SCell *)pCell + nHeader; + nLeft -= kLen; + if (nLocal > kLen + 4) { + // read partial val to local + pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); + if (pDecoder->pVal == NULL) { + return -1; + } + TDB_CELLDECODER_SET_FREE_VAL(pDecoder); + + memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno)); + + nLeft -= nLocal - kLen - sizeof(SPgno); + } + + memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno)); + + // unpack left val data from ovpages + while (pgno != 0) { + ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); + if (ret < 0) { + return -1; + } + + ofpCell = tdbPageGetCell(ofp, 0); + + if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { + bytes = nLeft; + lastPage = 1; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes); + nLeft -= bytes; + + memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); + } + } else { + int nLeftKey = kLen; + // load partial key and nextPgno + pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen); + if (pDecoder->pKey == NULL) { + return -1; + } + TDB_CELLDECODER_SET_FREE_KEY(pDecoder); + + memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4); + nLeft -= nLocal - 4; + nLeftKey -= nLocal -4; + + memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno)); + + int lastKeyPageSpace = 0; + // load left key & val to ovpages + while (pgno != 0) { + ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); + if (ret < 0) { + return -1; + } + + ofpCell = tdbPageGetCell(ofp, 0); + + int lastKeyPage = 0; + if (nLeftKey <= maxLocal - sizeof(SPgno)) { + bytes = nLeftKey; + lastKeyPage = 1; + lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + // cpy key + memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes); + + if (lastKeyPage) { + if (lastKeyPageSpace >= vLen) { + pDecoder->pVal = ofpCell + kLen -nLeftKey; + + nLeft -= vLen; + pgno = 0; + } else { + // read partial val to local + pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); + if (pDecoder->pVal == NULL) { + return -1; + } + TDB_CELLDECODER_SET_FREE_VAL(pDecoder); + + memcpy(pDecoder->pVal, ofpCell + kLen -nLeftKey, lastKeyPageSpace); + nLeft -= lastKeyPageSpace; + } + } + + memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); + + nLeftKey -= bytes; + nLeft -= bytes; + } + + while (nLeft > 0) { + ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); + if (ret < 0) { + return -1; + } + + ofpCell = tdbPageGetCell(ofp, 0); + + // load left val data to ovpages + lastPage = 0; + if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { + bytes = nLeft; + lastPage = 1; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + if (lastPage) { + pgno = 0; + } + + if (!pDecoder->pVal) { + pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); + if (pDecoder->pVal == NULL) { + return -1; + } + TDB_CELLDECODER_SET_FREE_VAL(pDecoder); + } + + memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes); + nLeft -= bytes; + + memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno)); + + nLeft -= bytes; + } + } } return 0; } -static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder) { +static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) { u8 leaf; int nHeader; int ret; @@ -963,6 +1340,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD pDecoder->vLen = -1; pDecoder->pVal = NULL; pDecoder->pgno = 0; + TDB_CELLDECODER_SET_FREE_NIL(pDecoder); // 1. Decode header part if (!leaf) { @@ -987,7 +1365,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD } // 2. Decode payload part - ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder); + ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder, pTxn, pBt); if (ret < 0) { return -1; } @@ -995,41 +1373,71 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD return 0; } -static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) { +static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt) { u8 leaf; - int szCell; - int kLen = 0, vLen = 0; + int kLen = 0, vLen = 0, nHeader = 0; leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); - szCell = 0; if (!leaf) { - szCell += sizeof(SPgno); + nHeader += sizeof(SPgno); } if (pPage->kLen == TDB_VARIANT_LEN) { - szCell += tdbGetVarInt(pCell + szCell, &kLen); + nHeader += tdbGetVarInt(pCell + nHeader, &kLen); } else { kLen = pPage->kLen; } if (pPage->vLen == TDB_VARIANT_LEN) { ASSERT(leaf); - szCell += tdbGetVarInt(pCell + szCell, &vLen); + nHeader += tdbGetVarInt(pCell + nHeader, &vLen); } else if (leaf) { vLen = pPage->vLen; } - szCell = szCell + kLen + vLen; + int nPayload = kLen + vLen; + if (nHeader + nPayload <= pPage->maxLocal) { + return nHeader + kLen + vLen; + } else { + int maxLocal = pPage->maxLocal; - if (szCell <= pPage->maxLocal) { - return szCell; - } + // calc local storage size + int minLocal = pPage->minLocal; + int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno)); + int nLocal = surplus <= maxLocal ? surplus : minLocal; - { - // TODO - ASSERT(0); - return 0; + // free ofp pages' cells + if (dropOfp) { + int ret = 0; + SPgno pgno = *(SPgno *) (pCell + nHeader + nLocal - sizeof(SPgno)); + int nLeft = nPayload - nLocal + sizeof(SPgno); + SPage *ofp; + int bytes; + + while (pgno != 0) { + ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt); + if (ret < 0) { + return -1; + } + + SCell *ofpCell = tdbPageGetCell(ofp, 0); + + if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { + bytes = nLeft; + } else { + bytes = ofp->maxLocal - sizeof(SPgno); + } + + memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); + + tdbPagerReturnPage(pPage->pPager, ofp, pTxn); + + nLeft -= bytes; + } + } + + return nHeader + nLocal; } } // TDB_BTREE_CELL @@ -1212,7 +1620,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); - tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd); + tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt); pKey = tdbRealloc(*ppKey, cd.kLen); if (pKey == NULL) { @@ -1258,7 +1666,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); - tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd); + tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt); pKey = tdbRealloc(*ppKey, cd.kLen); if (pKey == NULL) { @@ -1427,7 +1835,7 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int } pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); - tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder); + tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder, pBtc->pTxn, pBtc->pBt); if (ppKey) { *ppKey = (void *)pBtc->coder.pKey; @@ -1464,7 +1872,7 @@ int tdbBtcDelete(SBTC *pBtc) { return -1; } - tdbPageDropCell(pBtc->pPage, idx); + tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt); // update interior page or do balance if (idx == nCells - 1) { @@ -1488,9 +1896,9 @@ int tdbBtcDelete(SBTC *pBtc) { // update the cell with new key pCell = tdbOsMalloc(nKey + 9); - tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell); + tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell, pBtc->pTxn, pBtc->pBt); - ret = tdbPageUpdateCell(pPage, idx, pCell, szCell); + ret = tdbPageUpdateCell(pPage, idx, pCell, szCell, pBtc->pTxn, pBtc->pBt); if (ret < 0) { tdbOsFree(pCell); ASSERT(0); @@ -1529,7 +1937,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int // alloc space szBuf = kLen + nData + 14; - pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize); + pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize); if (pBuf == NULL) { ASSERT(0); return -1; @@ -1538,7 +1946,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int pCell = (SCell *)pBtc->pBt->pBuf; // encode cell - ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell); + ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell, pBtc->pTxn, pBtc->pBt); if (ret < 0) { ASSERT(0); return -1; @@ -1559,7 +1967,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int } else { ASSERT(pBtc->idx < nCells); - ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell); + ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell, pBtc->pTxn, pBtc->pBt); } if (ret < 0) { ASSERT(0); @@ -1620,7 +2028,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { // check if key <= current position if (idx < nCells) { pCell = tdbPageGetCell(pPage, idx); - tdbBtreeDecodeCell(pPage, pCell, &cd); + tdbBtreeDecodeCell(pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt); c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); if (c > 0) break; } @@ -1629,7 +2037,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { if (idx > 0) { pCell = tdbPageGetCell(pPage, idx - 1); tdbBtreeDecodeCell(pPage, pCell, &cd); - c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); + c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen, pBtc->pTxn, pBtc->pBt); if (c <= 0) break; } } @@ -1769,4 +2177,4 @@ void tdbBtPageInfo(SPage *pPage, int idx) { pBtPageInfo->nOvfl = pPage->nOverflow; } #endif -// TDB_BTREE_DEBUG \ No newline at end of file +// TDB_BTREE_DEBUG diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 78470b6256..7a70b621c6 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -82,7 +82,8 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) return 0; } -void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) { +void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, + TXN *, SBTree *pBt)) { pPage->pPageHdr = pPage->pData + szAmHdr; TDB_PAGE_NCELLS_SET(pPage, 0); TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr)); @@ -98,7 +99,8 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd); } -void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) { +void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, + TXN *, SBTree *pBt)) { pPage->pPageHdr = pPage->pData + szAmHdr; pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage); pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage); @@ -171,12 +173,12 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl return 0; } -int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell) { - tdbPageDropCell(pPage, idx); +int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt) { + tdbPageDropCell(pPage, idx, pTxn, pBt); return tdbPageInsertCell(pPage, idx, pCell, szCell, 0); } -int tdbPageDropCell(SPage *pPage, int idx) { +int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) { int lidx; SCell *pCell; int szCell; @@ -205,7 +207,7 @@ int tdbPageDropCell(SPage *pPage, int idx) { lidx = idx - iOvfl; pCell = TDB_PAGE_CELL_AT(pPage, lidx); - szCell = (*pPage->xCellSize)(pPage, pCell); + szCell = (*pPage->xCellSize)(pPage, pCell, 1, pTxn, pBt); tdbPageFree(pPage, lidx, pCell, szCell); TDB_PAGE_NCELLS_SET(pPage, nCells - 1); @@ -420,7 +422,7 @@ static int tdbPageDefragment(SPage *pPage) { ASSERT(pCell != NULL); - szCell = (*pPage->xCellSize)(pPage, pCell); + szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL); ASSERT(pCell + szCell <= pNextCell); if (pCell + szCell < pNextCell) { diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 71e009cabf..39b5584fab 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -116,13 +116,25 @@ typedef struct SBtInfo { int nData; } SBtInfo; +#define TDB_CELLD_F_NIL 0x0 +#define TDB_CELLD_F_KEY 0x1 +#define TDB_CELLD_F_VAL 0x2 + +#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL) +#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY) +#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL) + +#define TDB_CELLDECODER_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_KEY) +#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL) + typedef struct { int kLen; - const u8 *pKey; + u8 *pKey; int vLen; - const u8 *pVal; + u8 *pVal; SPgno pgno; u8 *pBuf; + u8 freeKV; } SCellDecoder; struct SBTC { @@ -251,7 +263,7 @@ struct SPage { int vLen; // value length of the page, -1 for unknown int maxLocal; int minLocal; - int (*xCellSize)(const SPage *, SCell *); + int (*xCellSize)(const SPage *, SCell *, int, TXN *pTxn, SBTree *pBt); // Fields used by SPCache TDB_PCACHE_PAGE }; @@ -298,16 +310,18 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) { #define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx) #define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage) #define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno) -#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset) +#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset) #define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset) int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg); int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg); -void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)); -void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)); +void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, + TXN *, SBTree *pBt)); +void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, + TXN *, SBTree *pBt)); int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl); -int tdbPageDropCell(SPage *pPage, int idx); -int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell); +int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt); +int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt); void tdbPageCopy(SPage *pFromPage, SPage *pToPage); int tdbPageCapacity(int pageSize, int amHdrSize); diff --git a/source/libs/tdb/test/CMakeLists.txt b/source/libs/tdb/test/CMakeLists.txt index b2c8aaf9bc..2621e02b02 100644 --- a/source/libs/tdb/test/CMakeLists.txt +++ b/source/libs/tdb/test/CMakeLists.txt @@ -4,4 +4,9 @@ target_link_libraries(tdbTest tdb gtest gtest_main) # tdbUtilTest add_executable(tdbUtilTest "tdbUtilTest.cpp") -target_link_libraries(tdbUtilTest tdb gtest gtest_main) \ No newline at end of file +target_link_libraries(tdbUtilTest tdb gtest gtest_main) + +# tdbUtilTest +add_executable(tdbExOVFLTest "tdbExOVFLTest.cpp") +target_link_libraries(tdbExOVFLTest tdb gtest gtest_main) + diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp new file mode 100644 index 0000000000..2d8d012a6a --- /dev/null +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -0,0 +1,469 @@ +#include + +#define ALLOW_FORBID_FUNC +#include "os.h" +#include "tdb.h" + +#include +#include +#include +#include + +typedef struct SPoolMem { + int64_t size; + struct SPoolMem *prev; + struct SPoolMem *next; +} SPoolMem; + +static SPoolMem *openPool() { + SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool)); + + pPool->prev = pPool->next = pPool; + pPool->size = 0; + + return pPool; +} + +static void clearPool(SPoolMem *pPool) { + SPoolMem *pMem; + + do { + pMem = pPool->next; + + if (pMem == pPool) break; + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + taosMemoryFree(pMem); + } while (1); + + assert(pPool->size == 0); +} + +static void closePool(SPoolMem *pPool) { + clearPool(pPool); + taosMemoryFree(pPool); +} + +static void *poolMalloc(void *arg, size_t size) { + void *ptr = NULL; + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size); + if (pMem == NULL) { + assert(0); + } + + pMem->size = sizeof(*pMem) + size; + pMem->next = pPool->next; + pMem->prev = pPool; + + pPool->next->prev = pMem; + pPool->next = pMem; + pPool->size += pMem->size; + + ptr = (void *)(&pMem[1]); + return ptr; +} + +static void poolFree(void *arg, void *ptr) { + SPoolMem *pPool = (SPoolMem *)arg; + SPoolMem *pMem; + + pMem = &(((SPoolMem *)ptr)[-1]); + + pMem->next->prev = pMem->prev; + pMem->prev->next = pMem->next; + pPool->size -= pMem->size; + + taosMemoryFree(pMem); +} + +static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { + int k1, k2; + + std::string s1((char *)pKey1 + 3, kLen1 - 3); + std::string s2((char *)pKey2 + 3, kLen2 - 3); + k1 = stoi(s1); + k2 = stoi(s2); + + if (k1 < k2) { + return -1; + } else if (k1 > k2) { + return 1; + } else { + return 0; + } +} + +static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) { + int mlen; + int cret; + + ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL); + + mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2; + cret = memcmp(pKey1, pKey2, mlen); + if (cret == 0) { + if (keyLen1 < keyLen2) { + cret = -1; + } else if (keyLen1 > keyLen2) { + cret = 1; + } else { + cret = 0; + } + } + return cret; +} + +TEST(TdbOVFLPagesTest, TbUpsertTest) { + +} + +TEST(TdbOVFLPagesTest, TbPGetTest) { + +} + +static void generateBigVal(char *val, int valLen) { + for (int i = 0; i < valLen; ++i) { + char c = char(i & 0xff); + if (c == 0) { + c = 1; + } + val[i] = c; + } +} + +static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { + TDB *pEnv = NULL; + + int ret = tdbOpen(envName, pageSize, pageNum, &pEnv); + if (ret) { + pEnv = NULL; + } + + return pEnv; +} + +static void insertOfp(void) { + int ret = 0; + + taosRemoveDir("tdb"); + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN txn; + int64_t txnid = 0; + ++txnid; + tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + tdbBegin(pEnv, &txn); + + // generate value payload + char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + // insert the generated big data + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); + GTEST_ASSERT_EQ(ret, 0); + + // commit current transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); +} + +//TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) { +TEST(TdbOVFLPagesTest, TbInsertTest) { + insertOfp(); +} + +//TEST(TdbOVFLPagesTest, DISABLED_TbGetTest) { +TEST(TdbOVFLPagesTest, TbGetTest) { + insertOfp(); + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + // generate value payload + char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + { // Query the data + void *pVal = NULL; + int vLen; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, valLen); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + + tdbFree(pVal); + } +} + +TEST(TdbOVFLPagesTest, TbDeleteTest) { + int ret = 0; + + taosRemoveDir("tdb"); + + // open Env + int const pageSize = 4096; + int const pageNum = 64; + TDB *pEnv = openEnv("tdb", pageSize, pageNum); + GTEST_ASSERT_NE(pEnv, nullptr); + + // open db + TTB *pDb = NULL; + tdb_cmpr_fn_t compFunc = tKeyCmpr; + ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + // open the pool + SPoolMem *pPool = openPool(); + + // start a transaction + TXN txn; + int64_t txnid = 0; + ++txnid; + tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + tdbBegin(pEnv, &txn); + + // generate value payload + char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int valLen = sizeof(val) / sizeof(val[0]); + generateBigVal(val, valLen); + + { // insert the generated big data + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the data + void *pVal = NULL; + int vLen; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, valLen); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + + tdbFree(pVal); + } + /* open to debug committed file + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); + + ++txnid; + tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + tdbBegin(pEnv, &txn); + */ + { // upsert the data + ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the upserted data + void *pVal = NULL; + int vLen; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, strlen("value1")); + GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0); + + tdbFree(pVal); + } + + { // delete the data + ret = tdbTbDelete(pDb, "key1", strlen("key1"), &txn); + GTEST_ASSERT_EQ(ret, 0); + } + + { // query the deleted data + void *pVal = NULL; + int vLen = -1; + + ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); + ASSERT(ret == -1); + GTEST_ASSERT_EQ(ret, -1); + + GTEST_ASSERT_EQ(vLen, -1); + GTEST_ASSERT_EQ(pVal, nullptr); + + tdbFree(pVal); + } + + // commit current transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); +} + +TEST(tdb_test, DISABLED_simple_insert1) { +//TEST(tdb_test, simple_insert1) { + int ret; + TDB *pEnv; + TTB *pDb; + tdb_cmpr_fn_t compFunc; + int nData = 1; + TXN txn; + int const pageSize = 4096; + + taosRemoveDir("tdb"); + + // Open Env + ret = tdbOpen("tdb", pageSize, 64, &pEnv); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + { + char key[64]; + //char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[(4083 - 4 - 3 - 2)+1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + int64_t poolLimit = 4096; // 1M pool limit + int64_t txnid = 0; + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + txnid++; + tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + tdbBegin(pEnv, &txn); + + for (int iData = 1; iData <= nData; iData++) { + sprintf(key, "key0"); + sprintf(val, "value%d", iData); + + //ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + //GTEST_ASSERT_EQ(ret, 0); + + // generate value payload + int valLen = sizeof(val) / sizeof(val[0]); + for (int i = 6; i < valLen; ++i) { + char c = char(i & 0xff); + if (c == 0) { + c = 1; + } + val[i] = c; + } + + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); + GTEST_ASSERT_EQ(ret, 0); + + // if pool is full, commit the transaction and start a new one + if (pPool->size >= poolLimit) { + // commit current transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); + + // start a new transaction + clearPool(pPool); + txnid++; + tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + tdbBegin(pEnv, &txn); + } + } + + // commit the transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); + + { // Query the data + void *pVal = NULL; + int vLen; + + for (int i = 1; i <= nData; i++) { + sprintf(key, "key%d", i); + sprintf(val, "value%d", i); + + ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen); + ASSERT(ret == 0); + GTEST_ASSERT_EQ(ret, 0); + + GTEST_ASSERT_EQ(vLen, strlen(val)); + GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0); + } + + tdbFree(pVal); + } + + { // Iterate to query the DB data + TBC *pDBC; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + int count = 0; + + ret = tdbTbcOpen(pDb, &pDBC, NULL); + GTEST_ASSERT_EQ(ret, 0); + + tdbTbcMoveToFirst(pDBC); + + for (;;) { + ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + + // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " "; + // std::cout.write((char *)pVal, vLen) /* << " " << vLen */; + // std::cout << std::endl; + + count++; + } + + GTEST_ASSERT_EQ(count, nData); + + tdbTbcClose(pDBC); + + tdbFree(pKey); + tdbFree(pVal); + } + } + + ret = tdbTbDrop(pDb); + GTEST_ASSERT_EQ(ret, 0); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); +} diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index c1ab51bb27..41bbfaa938 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -72,14 +72,14 @@ ./test.sh -f tsim/stream/basic0.sim ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim -# ./test.sh -f tsim/stream/distributeInterval0.sim +./test.sh -f tsim/stream/distributeInterval0.sim # ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/state0.sim -# ./test.sh -f tsim/stream/triggerInterval0.sim +./test.sh -f tsim/stream/triggerInterval0.sim # ./test.sh -f tsim/stream/triggerSession0.sim -# ./test.sh -f tsim/stream/partitionby.sim +./test.sh -f tsim/stream/partitionby.sim ./test.sh -f tsim/stream/schedSnode.sim diff --git a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim index 7b52b51306..e9dd82cad9 100644 --- a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim +++ b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim @@ -53,31 +53,10 @@ endi if $data(4)[4] != ready then goto step1 endi -#if $data(5)[4] != ready then -# goto step1 -#endi print =============== step2: create db sql create database d1 vgroups 1 replica 3 -# dnode not exist -sql_error redistribute vgroup 3 dnode 6 dnode 3 dnode 4 -# vgroup not exist -sql_error redistribute vgroup 3 dnode 5 dnode 3 dnode 4 -# un changed -sql_error redistribute vgroup 2 dnode 2 dnode 3 dnode 4 -# no enought vnodes -sql_error redistribute vgroup 2 dnode 1 dnode 3 dnode 4 -# offline vnodes -sql_error redistribute vgroup 2 dnode 5 dnode 3 dnode 4 -# Invalid replica -sql_error redistribute vgroup 2 dnode 5 -sql_error redistribute vgroup 2 dnode 5 dnode 3 -sql_error redistribute vgroup 2 dnode 2 dnode 3 -sql_error redistribute vgroup 2 dnode 2 dnode 2 -sql_error redistribute vgroup 3 dnode 2 dnode 2 -sql_error redistribute vgroup 2 dnode 2 dnode 2 dnode 3 - system sh/exec.sh -n dnode5 -s start $x = 0 step2: diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index 645b28f771..fb3503c841 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database with retentions -sql create database d0 retentions 15s:7d,1m:21d,15m:365d; +sql create database d0 retentions 5s:7d,10s:21d,15s:365d; sql use d0 print =============== create super table and register rsma @@ -29,6 +29,8 @@ sql insert into ct1 values(now, 10); sql insert into ct1 values(now+1s, 1); sql insert into ct1 values(now+2s, 100); +print =============== wait maxdelay 15+1 seconds for results +sleep 16000 print =============== select * from retention level 2 from memory sql select * from ct1; diff --git a/tests/system-test/1-insert/create_table_comment.py b/tests/system-test/1-insert/create_table_comment.py deleted file mode 100644 index 92ea083c5a..0000000000 --- a/tests/system-test/1-insert/create_table_comment.py +++ /dev/null @@ -1,113 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import random -import string -from util.log import * -from util.cases import * -from util.sql import * - -class TDTestCase: - def init(self, conn, logSql): - tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) - - def get_long_name(self, length, mode="mixed"): - """ - generate long name - mode could be numbers/letters/letters_mixed/mixed - """ - if mode == "numbers": - population = string.digits - elif mode == "letters": - population = string.ascii_letters.lower() - elif mode == "letters_mixed": - population = string.ascii_letters.upper() + string.ascii_letters.lower() - else: - population = string.ascii_letters.lower() + string.digits - return "".join(random.choices(population, k=length)) - - def __create_tb(self,dbname,stbname,tbname,comment): - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.execute( - f'create table {stbname} (ts timestamp,c0 int) tags(t0 int) ') - tdSql.execute( - f'create table {tbname} using {stbname} tags(1) comment "{comment}"') - def __create_normaltb(self,dbname,tbname,comment): - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.execute( - f'create table {tbname} (ts timestamp,c0 int) comment "{comment}"') - - def check_comment(self): - dbname = self.get_long_name(length=10, mode="letters") - ntbname = self.get_long_name(length=5, mode="letters") - - # create normal table with comment - comment = self.get_long_name(length=10, mode="letters") - self.__create_normaltb(dbname,ntbname,comment) - ntb_kv_list = tdSql.getResult("show tables") - print(ntb_kv_list) - tdSql.checkEqual(ntb_kv_list[0][8], comment) - tdSql.error('alter table {ntbname} comment "test1"') - tdSql.execute(f'drop database {dbname}') - - # max length(1024) - comment = self.get_long_name(length=1024, mode="letters") - self.__create_normaltb(dbname,ntbname,comment) - ntb_kv_list = tdSql.getResult("show tables") - tdSql.checkEqual(ntb_kv_list[0][8], comment) - tdSql.execute(f'drop database {dbname}') - - # error overlength - comment = self.get_long_name(length=1025, mode="letters") - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.error(f"create table ntb (ts timestamp,c0 int) comment '{comment}'") - tdSql.execute(f'drop database {dbname}') - - # create child table with comment - comment = self.get_long_name(length=10, mode="letters") - stbname = self.get_long_name(length=5, mode="letters") - tbname = self.get_long_name(length=3, mode="letters") - self.__create_tb(dbname,stbname,tbname,comment) - ntb_kv_list = tdSql.getResult("show tables") - tdSql.checkEqual(ntb_kv_list[0][8], comment) - tdSql.error(f'alter table {tbname} comment "test1"') - tdSql.execute(f'drop database {dbname}') - - # max length 1024 - comment = self.get_long_name(length=1024, mode="letters") - self.__create_tb(dbname,ntbname,comment) - ntb_kv_list = tdSql.getResult("show tables") - tdSql.checkEqual(ntb_kv_list[0][8], comment) - tdSql.execute(f'drop database {dbname}') - - # error overlength - comment = self.get_long_name(length=1025, mode="letters") - tdSql.execute(f'create database if not exists {dbname}') - tdSql.execute(f'use {dbname}') - tdSql.execute(f"create table stb (ts timestamp,c0 int) tags(t0 int)") - tdSql.error(f'create table stb_1 us stb tags(1) comment "{comment}"') - tdSql.execute(f'drop database {dbname}') - - def run(self): - self.check_comment() - - def stop(self): - tdSql.close() - tdLog.success("%s successfully executed" % __file__) - -tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/1-insert/table_comment.py b/tests/system-test/1-insert/table_comment.py new file mode 100644 index 0000000000..5b85a3964f --- /dev/null +++ b/tests/system-test/1-insert/table_comment.py @@ -0,0 +1,134 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import random +import string + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + # prepare data + self.ntbname = 'ntb' + self.stbname = 'stb' + self.column_dict = { + 'ts':'timestamp', + 'c1':'int', + 'c2':'float', + 'c3':'double', + 'c4':'timestamp' + } + self.tag_dict = { + 't0':'int' + } + self.comment_length = [0,1024] + self.error_comment_length = [1025] + self.table_type_list = ['normal_table','stable','child_table'] + self.comment_flag_list = [True,False] + + def __set_and_alter_comment(self,tb_type='',comment_flag= False): + + column_sql = '' + tag_sql = '' + for k,v in self.column_dict.items(): + column_sql += f"{k} {v}," + for k,v in self.tag_dict.items(): + tag_sql += f"{k} {v}," + if tb_type == 'normal_table' or tb_type == '': + if comment_flag == False: + tdSql.execute(f'create table {self.ntbname} ({column_sql[:-1]})') + self.check_comment_info() + self.alter_comment(self.ntbname) + tdSql.execute(f'drop table {self.ntbname}') + elif comment_flag == True: + for i in self.comment_length: + comment_info = tdCom.getLongName(i) + tdSql.execute(f'create table {self.ntbname} ({column_sql[:-1]}) comment "{comment_info}"') + self.check_comment_info(comment_info) + self.alter_comment(self.ntbname) + tdSql.execute(f'drop table {self.ntbname}') + for i in self.error_comment_length: + comment_info = tdCom.getLongName(i) + tdSql.error(f'create table {self.ntbname} ({column_sql[:-1]}) comment "{comment_info}"') + elif tb_type == 'stable': + for operation in ['table','stable']: + if comment_flag == False: + tdSql.execute(f'create {operation} {self.stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})') + self.check_comment_info(None,'stable') + self.alter_comment(self.stbname,'stable') + tdSql.execute(f'drop table {self.stbname}') + elif comment_flag == True: + for i in self.comment_length: + comment_info = tdCom.getLongName(i) + tdSql.execute(f'create {operation} {self.stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]}) comment "{comment_info}"') + self.check_comment_info(comment_info,'stable') + self.alter_comment(self.stbname,'stable') + tdSql.execute(f'drop table {self.stbname}') + elif tb_type == 'child_table': + tdSql.execute(f'create table if not exists {self.stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})') + if comment_flag == False: + tdSql.execute(f'create table if not exists {self.stbname}_ctb using {self.stbname} tags(1)') + self.check_comment_info() + self.alter_comment(f'{self.stbname}_ctb') + tdSql.execute(f'drop table {self.stbname}_ctb') + elif comment_flag == True: + for j in self.comment_length: + comment_info = tdCom.getLongName(j) + tdSql.execute(f'create table if not exists {self.stbname}_ctb using {self.stbname} tags(1) comment "{comment_info}"') + self.check_comment_info(comment_info) + self.alter_comment(f'{self.stbname}_ctb') + tdSql.execute(f'drop table {self.stbname}_ctb') + tdSql.execute(f'drop table {self.stbname}') + def alter_comment(self,tbname,tb_type=''): + for i in self.comment_length: + comment_info = tdCom.getLongName(i) + print(comment_info) + tdSql.execute(f'alter table {tbname} comment "{comment_info}"') + self.check_comment_info(comment_info,tb_type) + for i in self.error_comment_length: + comment_info = tdCom.getLongName(i) + tdSql.error(f'alter table {tbname} comment "{comment_info}"') + def check_comment_info(self,comment_info=None,tb_type=''): + if tb_type == '' or tb_type == 'normal_table' or tb_type == 'child_table': + tdSql.query('show tables') + if comment_info == None: + tdSql.checkEqual(tdSql.queryResult[0][8],None) + else : + tdSql.checkEqual(tdSql.queryResult[0][8],comment_info) + elif tb_type == 'stable': + tdSql.query('show stables') + if comment_info == None: + tdSql.checkEqual(tdSql.queryResult[0][6],None) + else : + tdSql.checkEqual(tdSql.queryResult[0][6],comment_info) + def comment_check_case(self,table_type,comment_flag): + tdSql.prepare() + for tb in table_type: + for flag in comment_flag: + self.__set_and_alter_comment(tb,flag) + tdSql.execute('drop database db') + + def run(self): + self.comment_check_case(self.table_type_list,self.comment_flag_list) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/6-cluster/5dnode3mnodeDrop.py b/tests/system-test/6-cluster/5dnode3mnodeDrop.py index b98134f5e0..e81d5295f2 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeDrop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeDrop.py @@ -75,7 +75,7 @@ class TDTestCase: testCluster = False valgrind = 0 hostname = socket.gethostname() - print(hostname) + tdLog.debug(hostname) dnodes = [] start_port = 6030 start_port_sec = 6130 @@ -102,12 +102,12 @@ class TDTestCase: # create cluster for dnode in self.TDDnodes.dnodes[1:]: - # print(dnode.cfgDict) + # tdLog.debug(dnode.cfgDict) dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" - print(cmd) + tdLog.debug(cmd) os.system(cmd) time.sleep(2) @@ -119,26 +119,26 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break count+=1 else: - print(tdSql.queryResult) - print("three mnodes is not ready in 10s ") + tdLog.debug(tdSql.queryResult) + tdLog.debug("three mnodes is not ready in 10s ") return -1 tdSql.query("show mnodes;") @@ -156,19 +156,19 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='offline' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break elif tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 1;") @@ -188,15 +188,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 2;") @@ -218,15 +218,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") + tdLog.debug("stop mnodes on dnode 3 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 3 failed in 10s") + tdLog.debug("stop mnodes on dnode 3 failed in 10s") return -1 tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") @@ -268,7 +268,7 @@ class TDTestCase: tdSql.error("drop mnode on dnode 1") tdSql.query("show dnodes;") - print(tdSql.queryResult) + tdLog.debug(tdSql.queryResult) # drop follower of mnode dropcount =0 @@ -282,7 +282,7 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(2): - print("drop mnode %d successfully"%(i+1)) + tdLog.debug("drop mnode %d successfully"%(i+1)) break count+=1 tdLog.debug("create mnode on dnode %d"%(i+1)) @@ -292,7 +292,7 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3): - print("drop mnode %d successfully"%(i+1)) + tdLog.debug("drop mnode %d successfully"%(i+1)) break count+=1 dropcount+=1 @@ -307,7 +307,7 @@ class TDTestCase: def run(self): - # print(self.master_dnode.cfgDict) + # tdLog.debug(self.master_dnode.cfgDict) self.buildcluster(5) self.five_dnode_three_mnode() diff --git a/tests/system-test/6-cluster/5dnode3mnodeStop.py b/tests/system-test/6-cluster/5dnode3mnodeStop.py index e11f819263..654b27bfc0 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStop.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStop.py @@ -99,12 +99,12 @@ class TDTestCase: # create cluster for dnode in self.TDDnodes.dnodes[1:]: - # print(dnode.cfgDict) + # tdLog.debug(dnode.cfgDict) dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" - print(cmd) + tdLog.debug(cmd) os.system(cmd) time.sleep(2) @@ -116,25 +116,25 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break count+=1 else: - print("three mnodes is not ready in 10s ") + tdLog.debug("three mnodes is not ready in 10s ") return -1 tdSql.query("show mnodes;") @@ -152,19 +152,19 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='offline' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break elif tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 1;") @@ -184,15 +184,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 2;") @@ -214,15 +214,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") + tdLog.debug("stop mnodes on dnode 3 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 3 failed in 10s") + tdLog.debug("stop mnodes on dnode 3 failed in 10s") return -1 tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") @@ -262,7 +262,7 @@ class TDTestCase: tdSql.error("create mnode on dnode 2") tdSql.query("show dnodes;") - print(tdSql.queryResult) + tdLog.debug(tdSql.queryResult) tdLog.debug("stop and follower of mnode") self.TDDnodes.stoptaosd(2) @@ -303,7 +303,7 @@ class TDTestCase: def run(self): - # print(self.master_dnode.cfgDict) + # tdLog.debug(self.master_dnode.cfgDict) self.buildcluster(5) self.five_dnode_three_mnode(5) diff --git a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py index ce3f2c4093..a53930faac 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py +++ b/tests/system-test/6-cluster/5dnode3mnodeStopInsert.py @@ -77,7 +77,7 @@ class TDTestCase: for couti in range(countstart,countstop): tdLog.debug("drop database if exists db%d" %couti) tdSql.execute("drop database if exists db%d" %couti) - print("create database if not exists db%d replica 1 duration 300" %couti) + tdLog.debug("create database if not exists db%d replica 1 duration 300" %couti) tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) tdSql.execute("use db%d" %couti) tdSql.execute( @@ -126,12 +126,12 @@ class TDTestCase: # create cluster for dnode in self.TDDnodes.dnodes[1:]: - # print(dnode.cfgDict) + # tdLog.debug(dnode.cfgDict) dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" - print(cmd) + tdLog.debug(cmd) os.system(cmd) time.sleep(2) @@ -144,22 +144,22 @@ class TDTestCase: statusReadyBumber=0 tdSql.query("show dnodes;") if tdSql.checkRows(dnodenumber) : - print("dnode is %d nodes"%dnodenumber) + tdLog.debug("dnode is %d nodes"%dnodenumber) for i in range(dnodenumber): if tdSql.queryResult[i][4] !='ready' : status=tdSql.queryResult[i][4] - print("dnode:%d status is %s "%(i,status)) + tdLog.debug("dnode:%d status is %s "%(i,status)) break else: statusReadyBumber+=1 - print(statusReadyBumber) + tdLog.debug(statusReadyBumber) if statusReadyBumber == dnodenumber : - print("all of %d mnodes is ready in 10s "%dnodenumber) + tdLog.debug("all of %d mnodes is ready in 10s "%dnodenumber) return True break count+=1 else: - print("%d mnodes is not ready in 10s "%dnodenumber) + tdLog.debug("%d mnodes is not ready in 10s "%dnodenumber) return False @@ -169,25 +169,25 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break elif tdSql.queryResult[0][2]=='follower' : if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("three mnodes is ready in 10s") + tdLog.debug("three mnodes is ready in 10s") break count+=1 else: - print("three mnodes is not ready in 10s ") + tdLog.debug("three mnodes is not ready in 10s ") return -1 tdSql.query("show mnodes;") @@ -205,19 +205,19 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='offline' : if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break elif tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='leader': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 1;") @@ -237,15 +237,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[2][2]=='follower': - print("stop mnodes on dnode 2 successfully in 10s") + tdLog.debug("stop mnodes on dnode 2 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 2 failed in 10s ") + tdLog.debug("stop mnodes on dnode 2 failed in 10s ") return -1 tdSql.error("drop mnode on dnode 2;") @@ -267,15 +267,15 @@ class TDTestCase: time.sleep(1) tdSql.query("show mnodes;") if tdSql.checkRows(3) : - print("mnode is three nodes") + tdLog.debug("mnode is three nodes") if tdSql.queryResult[0][2]=='leader' : if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[1][2]=='follower': - print("stop mnodes on dnode 3 successfully in 10s") + tdLog.debug("stop mnodes on dnode 3 successfully in 10s") break count+=1 else: - print("stop mnodes on dnode 3 failed in 10s") + tdLog.debug("stop mnodes on dnode 3 failed in 10s") return -1 tdSql.error("drop mnode on dnode 3;") tdSql.query("show mnodes;") @@ -311,7 +311,7 @@ class TDTestCase: tdSql.error("create mnode on dnode 2") tdSql.query("show dnodes;") - print(tdSql.queryResult) + tdLog.debug(tdSql.queryResult) tdLog.debug("stop all of mnode ") stopcount =0 @@ -325,10 +325,10 @@ class TDTestCase: self.TDDnodes.starttaosd(i+1) if self.checkdnodes(5): - print("123") + tdLog.debug("123") threads.join() else: - print("456") + tdLog.debug("456") self.stop_thread(threads) assert 1 == 2 ,"some dnode started failed" return False @@ -345,7 +345,7 @@ class TDTestCase: def run(self): - # print(self.master_dnode.cfgDict) + # tdLog.debug(self.master_dnode.cfgDict) self.buildcluster(5) self.five_dnode_three_mnode(5) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 41004cc5a2..db3f7216ab 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -22,7 +22,7 @@ python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py -# python3 ./test.py -f 1-inerst/create_table_comment.py +python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 612b870b7e..a62c30660d 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -114,8 +114,10 @@ void dumpStb(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(item, "tagVer", pObj->tagVer); tjsonAddIntegerToObject(item, "colVer", pObj->colVer); tjsonAddIntegerToObject(item, "nextColId", pObj->nextColId); - tjsonAddIntegerToObject(item, "xFilesFactor", pObj->xFilesFactor * 10000); - tjsonAddIntegerToObject(item, "delay", pObj->delay); + tjsonAddIntegerToObject(item, "watermark1", pObj->watermark[0]); + tjsonAddIntegerToObject(item, "watermark2", pObj->watermark[1]); + tjsonAddIntegerToObject(item, "maxdelay1", pObj->maxdelay[0]); + tjsonAddIntegerToObject(item, "maxdelay2", pObj->maxdelay[1]); tjsonAddIntegerToObject(item, "ttl", pObj->ttl); tjsonAddIntegerToObject(item, "numOfColumns", pObj->numOfColumns); tjsonAddIntegerToObject(item, "numOfTags", pObj->numOfTags);