diff --git a/deps/arm/dm_static/libdmodule.a b/deps/arm/dm_static/libdmodule.a index dbad0112cf..37077ef63b 100644 Binary files a/deps/arm/dm_static/libdmodule.a and b/deps/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/arm/dm_static/libdmodule.a b/deps/darwin/arm/dm_static/libdmodule.a index 2aab587b18..246b2247af 100644 Binary files a/deps/darwin/arm/dm_static/libdmodule.a and b/deps/darwin/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/x64/dm_static/libdmodule.a b/deps/darwin/x64/dm_static/libdmodule.a index 1fb6794f65..8745f57636 100644 Binary files a/deps/darwin/x64/dm_static/libdmodule.a and b/deps/darwin/x64/dm_static/libdmodule.a differ diff --git a/deps/mips/dm_static/libdmodule.a b/deps/mips/dm_static/libdmodule.a index d4b0582498..855a6a41d9 100644 Binary files a/deps/mips/dm_static/libdmodule.a and b/deps/mips/dm_static/libdmodule.a differ diff --git a/deps/x86/dm_static/libdmodule.a b/deps/x86/dm_static/libdmodule.a index 9d37818a79..6a3c0d45c2 100644 Binary files a/deps/x86/dm_static/libdmodule.a and b/deps/x86/dm_static/libdmodule.a differ diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 469bc6227e..93f17fa887 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -219,7 +219,6 @@ extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern int32_t tsResolveFQDNRetryTime; -extern bool tsDisableCount; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) @@ -234,10 +233,10 @@ int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer); struct SConfig *taosGetCfg(); -void taosSetAllDebugFlag(int32_t flag); -void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal); -void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); -int8_t taosGranted(int8_t type); +void taosSetGlobalDebugFlag(int32_t flag); +void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal); +void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); +int8_t taosGranted(int8_t type); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 27202fa51d..d3b1b76717 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -433,7 +433,7 @@ int32_t* taosGetErrno(); //mnode-compact #define TSDB_CODE_MND_INVALID_COMPACT_ID TAOS_DEF_ERROR_CODE(0, 0x04B1) - +#define TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x04B2) // vnode // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 9f09bd2930..eb887596d0 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -72,40 +72,6 @@ struct STaosQnode { char item[]; }; -struct STaosQueue { - STaosQnode *head; - STaosQnode *tail; - STaosQueue *next; // for queue set - STaosQset *qset; // for queue set - void *ahandle; // for queue set - FItem itemFp; - FItems itemsFp; - TdThreadMutex mutex; - int64_t memOfItems; - int32_t numOfItems; - int64_t threadId; - int64_t memLimit; - int64_t itemLimit; -}; - -struct STaosQset { - STaosQueue *head; - STaosQueue *current; - TdThreadMutex mutex; - tsem_t sem; - int32_t numOfQueues; - int32_t numOfItems; -}; - -struct STaosQall { - STaosQnode *current; - STaosQnode *start; - int32_t numOfItems; - int64_t memOfItems; - int32_t unAccessedNumOfItems; - int64_t unAccessMemOfItems; -}; - STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); @@ -140,6 +106,8 @@ int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); void taosResetQsetThread(STaosQset *qset, void *pItem); +void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId); +int64_t taosQueueGetThreadId(STaosQueue *pQueue); #ifdef __cplusplus } diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 2cf170cf04..d3ce2eb23b 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -26,6 +26,8 @@ typedef struct SScalableBf { SArray *bfArray; // array of bloom filters uint32_t growth; uint64_t numBits; + uint32_t maxBloomFilters; + int8_t status; _hash_fn_t hashFn1; _hash_fn_t hashFn2; } SScalableBf; diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index b732abffb1..122914fd34 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -80,7 +80,7 @@ extern "C" { #define IS_SAME_KEY (maxKV->type == kv->type && maxKV->keyLen == kv->keyLen && memcmp(maxKV->key, kv->key, kv->keyLen) == 0) #define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \ - (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE)) + (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == SLASH)) #define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len)) diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index 0c610a4611..7535cbfd0c 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -20,14 +20,14 @@ #include "clientSml.h" -#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH) -#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH) -#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH) +#define IS_COMMA(sql,escapeChar) (*(sql) == COMMA && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar))) +#define IS_SPACE(sql,escapeChar) (*(sql) == SPACE && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar))) +#define IS_EQUAL(sql,escapeChar) (*(sql) == EQUAL && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar))) #define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH)) #define IS_SLASH_LETTER_IN_TAG_FIELD_KEY(sql) \ - (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL)) + (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == SLASH)) #define PROCESS_SLASH_IN_FIELD_VALUE(key, keyLen) \ for (int i = 1; i < keyLen; ++i) { \ @@ -198,7 +198,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){ int cnt = 0; while (*sql < sqlEnd) { - if (unlikely(IS_SPACE(*sql))) { + if (unlikely(IS_SPACE(*sql,NULL))) { break; } @@ -207,18 +207,21 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){ size_t keyLen = 0; bool keyEscaped = false; size_t keyLenEscaped = 0; + const char *escapeChar = NULL; + while (*sql < sqlEnd) { - if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { + if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); terrno = TSDB_CODE_SML_INVALID_DATA; return -1; } - if (unlikely(IS_EQUAL(*sql))) { + if (unlikely(IS_EQUAL(*sql,escapeChar))) { keyLen = *sql - key; (*sql)++; break; } if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { + escapeChar = *sql; keyLenEscaped++; keyEscaped = true; } @@ -238,15 +241,16 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){ size_t valueLenEscaped = 0; while (*sql < sqlEnd) { // parse value - if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { + if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) { break; - } else if (unlikely(IS_EQUAL(*sql))) { + } else if (unlikely(IS_EQUAL(*sql,escapeChar))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); terrno = TSDB_CODE_SML_INVALID_DATA; return -1; } if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { + escapeChar = *sql; valueLenEscaped++; valueEscaped = true; } @@ -293,7 +297,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){ } cnt++; - if (IS_SPACE(*sql)) { + if (IS_SPACE(*sql,escapeChar)) { break; } (*sql)++; @@ -326,7 +330,7 @@ static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) { int cnt = 0; while (*sql < sqlEnd) { - if (unlikely(IS_SPACE(*sql))) { + if (unlikely(IS_SPACE(*sql,NULL))) { break; } @@ -335,17 +339,19 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL size_t keyLen = 0; bool keyEscaped = false; size_t keyLenEscaped = 0; + const char *escapeChar = NULL; while (*sql < sqlEnd) { - if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) { + if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) { smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); return TSDB_CODE_SML_INVALID_DATA; } - if (unlikely(IS_EQUAL(*sql))) { + if (unlikely(IS_EQUAL(*sql,escapeChar))) { keyLen = *sql - key; (*sql)++; break; } if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) { + escapeChar = *sql; keyLenEscaped++; keyEscaped = true; } @@ -363,7 +369,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL bool valueEscaped = false; size_t valueLenEscaped = 0; int quoteNum = 0; - const char *escapeChar = NULL; while (*sql < sqlEnd) { // parse value if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) { @@ -374,7 +379,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL } continue; } - if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql)))) { + if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar)))) { break; } if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) { @@ -437,7 +442,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL } cnt++; - if (IS_SPACE(*sql)) { + if (IS_SPACE(*sql,escapeChar)) { break; } (*sql)++; @@ -453,19 +458,18 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine elements->measure = sql; // parse measure size_t measureLenEscaped = 0; + const char *escapeChar = NULL; while (sql < sqlEnd) { - if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) { - elements->measureEscaped = true; - measureLenEscaped++; - sql++; - continue; - } - if (unlikely(IS_COMMA(sql))) { + if (unlikely(IS_COMMA(sql,escapeChar) || IS_SPACE(sql,escapeChar))) { break; } - if (unlikely(IS_SPACE(sql))) { - break; + if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) { + elements->measureEscaped = true; + escapeChar = sql; + measureLenEscaped++; + sql++; + continue; } sql++; } @@ -478,9 +482,12 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine // to get measureTagsLen before const char *tmp = sql; while (tmp < sqlEnd) { - if (unlikely(IS_SPACE(tmp))) { + if (unlikely(IS_SPACE(tmp,escapeChar))) { break; } + if(unlikely(IS_SLASH_LETTER_IN_TAG_FIELD_KEY(tmp))){ + escapeChar = tmp; + } tmp++; } elements->measureTagsLen = tmp - elements->measure; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9b74456da2..a893b27896 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -876,12 +876,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); taosReadAllQitems(pTmq->delayedTask, qall); - if (qall->numOfItems == 0) { + int32_t numOfItems = taosQallItemSize(qall); + if (numOfItems == 0) { taosFreeQall(qall); return TSDB_CODE_SUCCESS; } - tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); + tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); int8_t* pTaskType = NULL; taosGetQitem(qall, (void**)&pTaskType); @@ -1839,7 +1840,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal } static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { - tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); + tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); while (1) { SMqRspWrapper* pRspWrapper = NULL; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3d8c7c7f8b..c5a26c5c10 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -269,7 +269,6 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 1000; -bool tsDisableCount = true; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; @@ -541,8 +540,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - - if (cfgAddBool(pCfg, "disableCount", tsDisableCount, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; return 0; } @@ -705,7 +702,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - + if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1109,8 +1106,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; tsExperimental = cfgGetItem(pCfg, "experimental")->bval; - - tsDisableCount = cfgGetItem(pCfg, "disableCount")->bval; return 0; } @@ -1174,7 +1169,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsMonitorLogProtocol = cfgGetItem(pCfg, "monitorLogProtocol")->bval; tsMonitorIntervalForBasic = cfgGetItem(pCfg, "monitorIntervalForBasic")->i32; tsMonitorForceV2 = cfgGetItem(pCfg, "monitorForceV2")->i32; - + tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval; tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32; @@ -1263,6 +1258,8 @@ static int32_t taosSetReleaseCfg(SConfig *pCfg) { return 0; } int32_t taosSetReleaseCfg(SConfig *pCfg); #endif +static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag); + int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { if (tsCfg == NULL) osDefaultInit(); @@ -1307,7 +1304,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetServerLogCfg(pCfg); } - taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32); + taosSetAllDebugFlag(pCfg, cfgGetItem(pCfg, "debugFlag")->i32); if (taosMulModeMkDir(tsLogDir, 0777, true) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -1385,7 +1382,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile taosSetSystemCfg(tsCfg); if (taosSetFileHandlesLimit() != 0) return -1; - taosSetAllDebugFlag(cfgGetItem(tsCfg, "debugFlag")->i32); + taosSetAllDebugFlag(tsCfg, cfgGetItem(tsCfg, "debugFlag")->i32); cfgDumpCfg(tsCfg, tsc, false); @@ -1478,7 +1475,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { } if (strncasecmp(name, "debugFlag", 9) == 0) { - taosSetAllDebugFlag(pItem->i32); + taosSetAllDebugFlag(pCfg, pItem->i32); return 0; } @@ -1552,7 +1549,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { switch (lowcaseName[0]) { case 'd': { if (strcasecmp("debugFlag", name) == 0) { - taosSetAllDebugFlag(pItem->i32); + taosSetAllDebugFlag(pCfg, pItem->i32); matched = true; } break; @@ -1737,8 +1734,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { {"shellActivityTimer", &tsShellActivityTimer}, {"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter}, - {"experimental", &tsExperimental}, - {"disableCount", &tsDisableCount}}; + {"experimental", &tsExperimental}}; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); @@ -1777,11 +1773,13 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag taosSetDebugFlag(pFlagPtr, name, flag); } -void taosSetAllDebugFlag(int32_t flag) { +void taosSetGlobalDebugFlag(int32_t flag) { taosSetAllDebugFlag(tsCfg, flag); } + +static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { if (flag <= 0) return; SArray *noNeedToSetVars = NULL; - SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag"); + SConfigItem *pItem = cfgGetItem(pCfg, "debugFlag"); if (pItem != NULL) { pItem->i32 = flag; noNeedToSetVars = pItem->array; @@ -1831,4 +1829,4 @@ int8_t taosGranted(int8_t type) { break; } return 0; -} \ No newline at end of file +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4640c20e07..a1d279b494 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5238,11 +5238,11 @@ int32_t tDeserializeSQueryCompactProgressRsp(void *buf, int32_t bufLen, SQueryCo if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->finished) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -2; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -3; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -4; + if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -5; + if (tDecodeI32(&decoder, &pReq->finished) < 0) return -6; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 1508d88def..f9456f1729 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -68,7 +68,7 @@ static struct { int64_t startTime; } global = {0}; -static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143); } +static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetGlobalDebugFlag(143); } static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert = 1; } static void dmStopDnode(int signum, void *sigInfo, void *context) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index be88e8b3fd..3dfc2bd96f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -194,26 +194,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) while (pVnode->refCount > 0) taosMsleep(10); dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue, - pVnode->pWriteW.queue->threadId); + taosQueueGetThreadId(pVnode->pWriteW.queue)); tMultiWorkerCleanup(&pVnode->pWriteW); dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue, - pVnode->pSyncW.queue->threadId); + taosQueueGetThreadId(pVnode->pSyncW.queue)); tMultiWorkerCleanup(&pVnode->pSyncW); dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, - pVnode->pSyncRdW.queue->threadId); + taosQueueGetThreadId(pVnode->pSyncRdW.queue)); tMultiWorkerCleanup(&pVnode->pSyncRdW); dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, - pVnode->pApplyW.queue->threadId); + taosQueueGetThreadId(pVnode->pApplyW.queue)); tMultiWorkerCleanup(&pVnode->pApplyW); dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, - pVnode->pFetchQ->threadId); + taosQueueGetThreadId(pVnode->pFetchQ)); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); tqNotifyClose(pVnode->pImpl->pTq); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 8b80527447..a6abe5ab4d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -365,16 +365,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue, - pVnode->pWriteW.queue->threadId); + taosQueueGetThreadId(pVnode->pWriteW.queue)); dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue, - pVnode->pSyncW.queue->threadId); + taosQueueGetThreadId(pVnode->pSyncW.queue)); dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, - pVnode->pSyncRdW.queue->threadId); + taosQueueGetThreadId(pVnode->pSyncRdW.queue)); dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, - pVnode->pApplyW.queue->threadId); + taosQueueGetThreadId(pVnode->pApplyW.queue)); dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, - pVnode->pFetchQ->threadId); + taosQueueGetThreadId(pVnode->pFetchQ)); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); return 0; } diff --git a/source/dnode/mgmt/node_mgmt/CMakeLists.txt b/source/dnode/mgmt/node_mgmt/CMakeLists.txt index 15c1d2fa4d..0cdc68345a 100644 --- a/source/dnode/mgmt/node_mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/node_mgmt/CMakeLists.txt @@ -14,6 +14,10 @@ IF (TD_STORAGE) ENDIF () +IF (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +ENDIF() + target_include_directories( dnode PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index a172756aad..ceaf086dc1 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -26,6 +26,10 @@ target_link_libraries( mnode scheduler sdb wal transport cjson sync monitor executor qworker stream parser audit monitorfw ) +IF (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +ENDIF() + IF (TD_GRANT) TARGET_LINK_LIBRARIES(mnode grant) ADD_DEFINITIONS(-D_GRANT) diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 4e71684372..2d714596a9 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -240,7 +240,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact); if (pVgRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) { sdbFreeRaw(pVgRaw); return -1; } @@ -363,13 +363,15 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p } static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "kill-compact"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact"); if (pTrans == NULL) { mError("compact:%" PRId32 ", failed to drop since %s" , pCompact->compactId, terrstr()); return -1; } mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId); + mndTransSetDbName(pTrans, pCompact->dbname, NULL); + SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); @@ -378,7 +380,7 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - void *pIter = NULL; + void *pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); @@ -488,13 +490,17 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c sdbRelease(pMnode->pSdb, pDetail); } - return -1; + return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST; } int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ SQueryCompactProgressRsp req = {0}; - if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) { + int32_t code = 0; + code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req); + if (code != 0) { terrno = TSDB_CODE_INVALID_MSG; + mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", + code, pReq->pCont, pReq->contLen); return -1; } @@ -502,10 +508,10 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); SMnode *pMnode = pReq->info.node; - int32_t code = -1; - - if(mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req) != 0){ + code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req); + if(code != 0){ + terrno = code; mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); return -1; @@ -612,15 +618,17 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { return 0; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-compact-progress"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress"); if (pTrans == NULL) { mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr()); return -1; } mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id); - + SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); + mndTransSetDbName(pTrans, pCompact->dbname, NULL); + pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index e1f76b3a25..63ca7251f5 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -484,22 +484,26 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%"PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId); snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%"PRId32, pVnode->config.vgId); - if(pVnode->monitor.insertCounter == NULL){ - int32_t label_count = 7; - const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID, - VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP, - VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME, - VNODE_METRIC_TAG_NAME_RESULT}; - taos_counter_t *counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", - label_count, sample_labels); - vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter); - if(taos_collector_registry_register_metric(counter) == 1){ - taos_counter_destroy(counter); - counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); - vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter); + if(tsEnableMonitor && pVnode->monitor.insertCounter == NULL){ + taos_counter_t *counter = NULL; + counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); + if(counter == NULL){ + int32_t label_count = 7; + const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID, + VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP, + VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME, + VNODE_METRIC_TAG_NAME_RESULT}; + counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", + label_count, sample_labels); + vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter); + if(taos_collector_registry_register_metric(counter) == 1){ + taos_counter_destroy(counter); + counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); + vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter); + } } pVnode->monitor.insertCounter = counter; - vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter); + vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter); } return pVnode; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 52d75f9ab5..e32d4b70e0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,6 +16,7 @@ #include "audit.h" #include "cos.h" #include "tencode.h" +#include "tglobal.h" #include "tmsg.h" #include "tstrbuild.h" #include "vnd.h" @@ -1708,7 +1709,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); - if(pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){ + if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){ const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId, pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId, pOriginalMsg->info.conn.user, "Success"}; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 1d2a55fac8..1f38264644 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -94,10 +94,10 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pBlock->info.rows;) { - int32_t step = pInfo->windowSliding; SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow); int32_t prevRows = pBuffInfo->winRows; int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows); + int32_t step = num; if (prevRows == 0) { pInfo->pRow->win.skey = tsCols[i]; } @@ -118,6 +118,8 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (prevRows <= pInfo->windowSliding) { if (pBuffInfo->winRows > pInfo->windowSliding) { step = pInfo->windowSliding - prevRows; + } else { + step = pInfo->windowSliding; } } else { step = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2534c5e9f0..831fd4e883 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1009,6 +1009,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { pSup->deleteMark = INT64_MAX; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredData = false; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) { + SStreamCountAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); + + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; + qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData); } // iterate operator tree diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8cc2f72adb..9024f7a341 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3763,8 +3763,9 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { taosMemoryFree(pSubTblsInfo); pInfo->pSubTablesMergeInfo = NULL; + + taosMemoryTrim(0); } - taosMemoryTrim(0); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 1313221952..b7b9f1cc9f 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -76,8 +76,8 @@ char* idxInt2str(int64_t val, char* dst, int radix) { return dst - 1; } __compar_fn_t idxGetCompar(int8_t type) { - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || - type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_GEOMETRY) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR || + type == TSDB_DATA_TYPE_GEOMETRY) { return (__compar_fn_t)strcmp; } return getComparFunc(type, 0); @@ -108,8 +108,8 @@ static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) { return tCompare(func, QUERY_TERM, a, b, type); } TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { - if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || - dtype == TSDB_DATA_TYPE_VARBINARY || dtype == TSDB_DATA_TYPE_GEOMETRY) { + if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY || + dtype == TSDB_DATA_TYPE_GEOMETRY) { return tDoCompare(func, cmptype, a, b); } #if 1 @@ -290,6 +290,7 @@ int idxUidCompare(const void* a, const void* b) { uint64_t r = *(uint64_t*)b; return l - r; } +#ifdef BUILD_NO_CALL int32_t idxConvertData(void* src, int8_t type, void** dst) { int tlen = -1; switch (type) { @@ -372,6 +373,8 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) { // indexMayFillNumbericData(*dst, tlen); return tlen; } +#endif + int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { if (src == NULL) { *dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR)); diff --git a/source/libs/monitorfw/src/taos_collector_registry.c b/source/libs/monitorfw/src/taos_collector_registry.c index 55b08775b4..c3ed0112c5 100644 --- a/source/libs/monitorfw/src/taos_collector_registry.c +++ b/source/libs/monitorfw/src/taos_collector_registry.c @@ -216,7 +216,7 @@ int taos_collector_registry_validate_metric_name(taos_collector_registry_t *self regfree(&r); return 0; } - +/* const char *taos_collector_registry_bridge(taos_collector_registry_t *self, char *ts, char *format) { taos_metric_formatter_clear(self->metric_formatter); taos_metric_formatter_load_metrics(self->metric_formatter, self->collectors, ts, format); @@ -229,7 +229,7 @@ const char *taos_collector_registry_bridge(taos_collector_registry_t *self, char return taos_string_builder_str(self->string_builder_batch); } - +*/ int taos_collector_registry_clear_batch(taos_collector_registry_t *self){ return taos_string_builder_clear(self->string_builder_batch); } diff --git a/source/libs/monitorfw/src/taos_gauge.c b/source/libs/monitorfw/src/taos_gauge.c index 74d2665194..7793f4c464 100644 --- a/source/libs/monitorfw/src/taos_gauge.c +++ b/source/libs/monitorfw/src/taos_gauge.c @@ -38,7 +38,7 @@ int taos_gauge_destroy(taos_gauge_t *self) { self = NULL; return r; } - +/* int taos_gauge_inc(taos_gauge_t *self, const char **label_values) { TAOS_ASSERT(self != NULL); if (self == NULL) return 1; @@ -86,7 +86,7 @@ int taos_gauge_sub(taos_gauge_t *self, double r_value, const char **label_values if (sample == NULL) return 1; return taos_metric_sample_sub(sample, r_value); } - +*/ int taos_gauge_set(taos_gauge_t *self, double r_value, const char **label_values) { TAOS_ASSERT(self != NULL); if (self == NULL) return 1; diff --git a/source/libs/monitorfw/src/taos_metric_formatter.c b/source/libs/monitorfw/src/taos_metric_formatter.c index a9f35c3e8d..53012935ba 100644 --- a/source/libs/monitorfw/src/taos_metric_formatter.c +++ b/source/libs/monitorfw/src/taos_metric_formatter.c @@ -63,7 +63,7 @@ int taos_metric_formatter_destroy(taos_metric_formatter_t *self) { self = NULL; return ret; } - +/* int taos_metric_formatter_load_help(taos_metric_formatter_t *self, const char *name, const char *help) { TAOS_ASSERT(self != NULL); if (self == NULL) return 1; @@ -105,7 +105,7 @@ int taos_metric_formatter_load_type(taos_metric_formatter_t *self, const char *n return taos_string_builder_add_char(self->string_builder, '\n'); } - +*/ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char *name, const char *suffix, size_t label_count, const char **label_keys, const char **label_values) { TAOS_ASSERT(self != NULL); @@ -156,7 +156,7 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char } return 0; } - +/* int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample, char *ts, char *format) { TAOS_ASSERT(self != NULL); @@ -185,7 +185,7 @@ int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric return taos_string_builder_add_char(self->string_builder, '\n'); } - +*/ int taos_metric_formatter_clear(taos_metric_formatter_t *self) { TAOS_ASSERT(self != NULL); return taos_string_builder_clear(self->string_builder); @@ -204,7 +204,7 @@ char *taos_metric_formatter_dump(taos_metric_formatter_t *self) { } return data; } - +/* int taos_metric_formatter_load_metric(taos_metric_formatter_t *self, taos_metric_t *metric, char *ts, char *format) { TAOS_ASSERT(self != NULL); if (self == NULL) return 1; @@ -255,3 +255,4 @@ int taos_metric_formatter_load_metrics(taos_metric_formatter_t *self, taos_map_t } return r; } +*/ \ No newline at end of file diff --git a/source/libs/monitorfw/src/taos_metric_sample.c b/source/libs/monitorfw/src/taos_metric_sample.c index ca6ea30028..c6d817b513 100644 --- a/source/libs/monitorfw/src/taos_metric_sample.c +++ b/source/libs/monitorfw/src/taos_metric_sample.c @@ -91,6 +91,7 @@ int taos_metric_sample_add(taos_metric_sample_t *self, double r_value) { return 0; } +/* int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) { TAOS_ASSERT(self != NULL); if (self->type != TAOS_GAUGE) { @@ -99,7 +100,8 @@ int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) { } #ifdef C11_ATOMIC - /*_Atomic*/ double old = atomic_load(&self->r_value); + ///_Atomic/ + double old = atomic_load(&self->r_value); for (;;) { _Atomic double new = ATOMIC_VAR_INIT(old - r_value); if (atomic_compare_exchange_weak(&self->r_value, &old, new)) { @@ -116,6 +118,7 @@ int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) { return 0; } +*/ int taos_metric_sample_set(taos_metric_sample_t *self, double r_value) { if (self->type != TAOS_GAUGE && self->type != TAOS_COUNTER) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 5689d26855..45da4ccf9f 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1099,10 +1099,6 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo return TSDB_CODE_OUT_OF_MEMORY; } - if (!pCxt->pPlanCxt->streamQuery && tsDisableCount) { - return TSDB_CODE_FAILED; - } - pWindow->winType = WINDOW_TYPE_COUNT; pWindow->node.groupAction = getGroupAction(pCxt, pSelect); pWindow->node.requireDataOrder = diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 03f70604b7..1dc1db8e9c 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -17,10 +17,14 @@ #define _STREAM_BACKEDN_ROCKSDB_H_ #include "rocksdb/c.h" -//#include "streamInt.h" +// #include "streamInt.h" #include "streamState.h" #include "tcommon.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct SCfComparator { rocksdb_comparator_t** comp; int32_t numOfComp; @@ -244,11 +248,6 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); -// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); - -// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); -// void taskDbDestroy(void* pDb, bool flush); - int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); SBkdMgt* bkdMgtCreate(char* path); @@ -258,4 +257,10 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); -#endif \ No newline at end of file + +uint32_t nextPow2(uint32_t x); +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f173157da6..910fd93989 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2788,7 +2788,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } -#ifdef BUILD_NO_CALL SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { stDebug("streamStateGetCur_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -2838,7 +2837,6 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { STREAM_STATE_DEL_ROCKSDB(pState, "func", key); return 0; } -#endif // session cf int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { @@ -3432,7 +3430,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* SSessionKey tmpKey = *key; int32_t valSize = *pVLen; void* tmp = taosMemoryMalloc(valSize); - // tdbRealloc(NULL, valSize); if (!tmp) { return -1; } @@ -3506,13 +3503,11 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } -#ifdef BUILD_NO_CALL int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } -#endif int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); @@ -3535,10 +3530,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co if (pIter == NULL) { return -1; } - + size_t klen = 0; rocksdb_iter_seek(pIter, start, strlen(start)); while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); + const char* key = rocksdb_iter_key(pIter, &klen); int32_t vlen = 0; const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen); char* val = NULL; @@ -3700,6 +3695,8 @@ uint32_t nextPow2(uint32_t x) { x = x | (x >> 16); return x + 1; } + +#ifdef BUILD_NO_CALL int32_t copyFiles(const char* src, const char* dst) { int32_t code = 0; // opt later, just hard link @@ -3739,6 +3736,7 @@ _err: taosCloseDir(&pDir); return code >= 0 ? 0 : -1; } +#endif int32_t isBkdDataMeta(char* name, int32_t len) { const char* pCurrent = "CURRENT"; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b35f401cb9..a09b940a19 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -70,7 +70,7 @@ static void streamMetaEnvInit() { streamTimerInit(); } -void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);} +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { taosCloseRef(streamBackendId); @@ -1104,14 +1104,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); + entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0; + entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; entry.checkpointId = (*pTask)->chkInfo.checkpointingId; entry.chkpointTransId = (*pTask)->chkInfo.transId; @@ -1172,7 +1172,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } - _end: +_end: streamMetaClearHbMsg(&hbMsg); return TSDB_CODE_SUCCESS; } @@ -1304,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-rlock", pMeta->vgId); + // stTrace("vgId:%d meta-rlock", pMeta->vgId); taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-runlock", pMeta->vgId); + // stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { -// stTrace("vgId:%d meta-runlock completed", pMeta->vgId); + // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); } } void streamMetaWLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wlock", pMeta->vgId); + // stTrace("vgId:%d meta-wlock", pMeta->vgId); taosThreadRwlockWrlock(&pMeta->lock); -// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); + // stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wunlock", pMeta->vgId); + // stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosThreadRwlockUnlock(&pMeta->lock); } @@ -1395,7 +1395,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) pMeta->sendMsgBeforeClosing = true; } - pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; + pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER; streamMetaWUnLock(pMeta); if (isLeader) { @@ -1531,8 +1531,8 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); - for(int32_t i = 0; i < num; ++i) { - STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + for (int32_t i = 0; i < num; ++i) { + STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); if (ppTask == NULL) { continue; @@ -1633,7 +1633,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", + ", readyTs:%" PRId64 " total elapsed time:%.2fs", pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 454ed4297c..764bf6e026 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -22,7 +22,7 @@ #define DEFAULT_MAP_CAPACITY 131072 #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 100000 +#define MAX_NUM_SCALABLE_BF 64 #define MIN_NUM_SCALABLE_BF 10 #define DEFAULT_PREADD_BUCKET 1 #define MAX_INTERVAL MILLISECOND_PER_MINUTE @@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { if (watermark <= adjInterval) { watermark = TMAX(originInt / adjInterval, 1) * adjInterval; - } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { + } + + if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; } return watermark; diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index c90e05bcf6..c472207b27 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -1,40 +1,104 @@ -MESSAGE(STATUS "build stream unit test") - -# GoogleTest requires at least C++11 -SET(CMAKE_CXX_STANDARD 11) -AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) # bloomFilterTest -ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") -TARGET_LINK_LIBRARIES(streamUpdateTest - PUBLIC os util common gtest gtest_main stream executor index +#TARGET_LINK_LIBRARIES(streamUpdateTest + #PUBLIC os util common gtest gtest_main stream executor index + #) + +#TARGET_INCLUDE_DIRECTORIES( + #streamUpdateTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) +#TARGET_LINK_LIBRARIES( + #checkpointTest + #PUBLIC os common gtest stream executor qcom index transport util +#) + +#TARGET_INCLUDE_DIRECTORIES( + #checkpointTest + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#add_executable(backendTest "") + +#target_sources(backendTest + #PRIVATE + #"backendTest.cpp" +#) + +#TARGET_LINK_LIBRARIES( + #backendTest + #PUBLIC rocksdb + #PUBLIC os common gtest stream executor qcom index transport util +#) + +#TARGET_INCLUDE_DIRECTORIES( + #backendTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#add_test( + #NAME streamUpdateTest + #COMMAND streamUpdateTest +#) + +#add_test( + #NAME checkpointTest + #COMMAND checkpointTest +#) +#add_test( + #NAME backendTest + #COMMAND backendTest +#) + + +#add_executable(backendTest "") + +#target_sources(backendTest + #PUBLIC + #"backendTest.cpp" +#) + +#target_include_directories( + #backendTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#target_link_libraries( + #backendTest + #PUBLIC rocksdb + #PUBLIC os common gtest stream executor qcom index transport util +#) + + +MESSAGE(STATUS "build parser unit test") + +IF(NOT TD_DARWIN) + # GoogleTest requires at least C++11 + SET(CMAKE_CXX_STANDARD 11) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + + ADD_EXECUTABLE(backendTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES( + backendTest + PUBLIC rocksdb + PUBLIC os common gtest stream executor qcom index transport util vnode ) -TARGET_INCLUDE_DIRECTORIES( - streamUpdateTest - PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" -) + TARGET_INCLUDE_DIRECTORIES( + backendTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" + ) -ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) -TARGET_LINK_LIBRARIES( - checkpointTest - PUBLIC os common gtest stream executor qcom index transport util -) - -TARGET_INCLUDE_DIRECTORIES( - checkpointTest - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" -) - -add_test( - NAME streamUpdateTest - COMMAND streamUpdateTest -) - -add_test( - NAME checkpointTest - COMMAND checkpointTest -) \ No newline at end of file + ADD_TEST( + NAME backendTest + COMMAND backendTest + ) +ENDIF () \ No newline at end of file diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp new file mode 100644 index 0000000000..a949748eb5 --- /dev/null +++ b/source/libs/stream/test/backendTest.cpp @@ -0,0 +1,437 @@ +#include + +#include +#include +#include +#include +#include "streamBackendRocksdb.h" +#include "streamSnapshot.h" +#include "streamState.h" +#include "tstream.h" +#include "tstreamFileState.h" +#include "tstreamUpdate.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wformat" +#pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +#pragma GCC diagnostic ignored "-Wpointer-arith" + +class BackendEnv : public ::testing::Test { + protected: + virtual void SetUp() {} + virtual void TearDown() {} +}; + +void *backendCreate() { + const char *streamPath = "/tmp"; + void *p = NULL; + + // char *absPath = NULL; + // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); + // STaskDbWrapper *p = taskDbOpen((char *)streamPath, (char *)"stream-backend", -1); + // ASSERT(p != NULL); + return p; +} + +SStreamState *stateCreate(const char *path) { + SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask)); + pTask->ver = 1024; + pTask->id.streamId = 1023; + pTask->id.taskId = 1111111; + SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); + pTask->pMeta = pMeta; + + SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); + ASSERT(p != NULL); + return p; +} +void *backendOpen() { + streamMetaInit(); + const char *path = "/tmp/backend"; + SStreamState *p = stateCreate(path); + ASSERT(p != NULL); + + // write bacth + // default/state/fill/sess/func/parname/partag + int32_t size = 100; + std::vector tsArray; + for (int32_t i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + SWinKey key; // = {.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + const char *val = "value data"; + int32_t vlen = strlen(val); + streamStatePut_rocksdb(p, &key, (char *)val, vlen); + + tsArray.push_back(ts); + } + for (int32_t i = 0; i < size; i++) { + int64_t ts = tsArray[i]; + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + + const char *val = "value data"; + int32_t len = 0; + char *newVal = NULL; + streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(len == strlen(val)); + } + int64_t ts = tsArray[0]; + SWinKey key = {0}; // {.groupId = (uint64_t)(0), .ts = ts}; + key.groupId = (uint64_t)(0); + key.ts = ts; + + streamStateDel_rocksdb(p, &key); + + streamStateClear_rocksdb(p); + + for (int i = 0; i < size; i++) { + int64_t ts = tsArray[i]; + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + + const char *val = "value data"; + int32_t len = 0; + char *newVal = NULL; + int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(code != 0); + } + tsArray.clear(); + + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + tsArray.push_back(ts); + + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + + const char *val = "value data"; + int32_t vlen = strlen(val); + streamStatePut_rocksdb(p, &key, (char *)val, vlen); + } + + SWinKey winkey; + int32_t code = streamStateGetFirst_rocksdb(p, &key); + ASSERT(code == 0); + ASSERT(key.ts == tsArray[0]); + + SStreamStateCur *pCurr = streamStateSeekToLast_rocksdb(p); + ASSERT(pCurr != NULL); + streamStateFreeCur(pCurr); + + winkey.groupId = 0; + winkey.ts = tsArray[0]; + char *val = NULL; + int32_t len = 0; + + pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); + ASSERT(pCurr != NULL); + + streamStateFreeCur(pCurr); + + tsArray.clear(); + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + tsArray.push_back(ts); + STupleKey key = {0}; + key.groupId = (uint64_t)(0); //= {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i}; + key.ts = ts; + key.exprIdx = i; + + const char *val = "Value"; + int32_t len = strlen(val); + streamStateFuncPut_rocksdb(p, &key, val, len); + } + for (int i = 0; i < size; i++) { + STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[i]; + key.exprIdx = i; + + char *val = NULL; + int32_t len = 0; + streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); + ASSERT(len == strlen("Value")); + } + for (int i = 0; i < size; i++) { + STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[i]; + key.exprIdx = i; + + char *val = NULL; + int32_t len = 0; + streamStateFuncDel_rocksdb(p, &key); + } + + // session put + tsArray.clear(); + + for (int i = 0; i < size; i++) { + SSessionKey key = {0}; //{.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)}; + key.win.skey = i; + key.win.ekey = i; + key.groupId = (uint64_t)(0); + tsArray.push_back(i); + + const char *val = "Value"; + int32_t len = strlen(val); + streamStateSessionPut_rocksdb(p, &key, val, len); + + char *pval = NULL; + ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); + ASSERT(strncmp(pval, val, len) == 0); + } + + for (int i = 0; i < size; i++) { + SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + key.win.skey = tsArray[i]; + key.win.ekey = tsArray[i]; + key.groupId = (uint64_t)(0); + + const char *val = "Value"; + int32_t len = strlen(val); + + char *pval = NULL; + ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); + ASSERT(strncmp(pval, val, len) == 0); + taosMemoryFreeClear(pval); + } + + pCurr = streamStateSessionSeekToLast_rocksdb(p, 0); + ASSERT(pCurr != NULL); + + { + SSessionKey key; + memset(&key, 0, sizeof(key)); + char *val = NULL; + int32_t vlen = 0; + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key); + + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + + ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]); + + pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key); + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + ASSERT(vlen == strlen("Value")); + ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]); + + ASSERT(0 == streamStateSessionAddIfNotExist_rocksdb(p, &key, 10, (void **)&val, &len)); + + ASSERT(0 == + streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len)); + } + for (int i = 0; i < size; i++) { + SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + key.win.skey = tsArray[i]; + key.win.ekey = tsArray[i]; + key.groupId = (uint64_t)(0); + + const char *val = "Value"; + int32_t len = strlen(val); + + char *pval = NULL; + ASSERT(0 == streamStateSessionDel_rocksdb(p, &key)); + } + + for (int i = 0; i < size; i++) { + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; + const char *val = "Value"; + int32_t vlen = strlen(val); + ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0); + } + for (int i = 0; i < size; i++) { + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; + char *val = NULL; + int32_t vlen = 0; + ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); + taosMemoryFreeClear(val); + } + { + SWinKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[0]}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[0]; + SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); + ASSERT(pCurr != NULL); + + char *val = NULL; + int32_t vlen = 0; + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + ASSERT(vlen == strlen("Value")); + streamStateFreeCur(pCurr); + + pCurr = streamStateFillSeekKeyNext_rocksdb(p, &key); + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + ASSERT(vlen == strlen("Value") && key.groupId == 1 && key.ts == tsArray[1]); + + key.groupId = 1; + key.ts = tsArray[1]; + + pCurr = streamStateFillSeekKeyPrev_rocksdb(p, &key); + ASSERT(pCurr != NULL); + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + + ASSERT(vlen == strlen("Value") && key.groupId == 0 && key.ts == tsArray[0]); + } + + for (int i = 0; i < size - 1; i++) { + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; + char *val = NULL; + int32_t vlen = 0; + ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); + taosMemoryFreeClear(val); + } + streamStateSessionClear_rocksdb(p); + + for (int i = 0; i < size; i++) { + char tbname[TSDB_TABLE_NAME_LEN] = {0}; + sprintf(tbname, "%s_%d", "tbname", i); + ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname)); + } + for (int i = 0; i < size; i++) { + char *val = NULL; + ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val)); + ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0); + taosMemoryFree(val); + } + + for (int i = 0; i < size; i++) { + char tbname[TSDB_TABLE_NAME_LEN] = {0}; + sprintf(tbname, "%s_%d", "tbname", i); + ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname)); + } + for (int i = 0; i < size; i++) { + char *val = NULL; + ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val)); + ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0); + taosMemoryFree(val); + } + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "tbname_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + code = streamDefaultPut_rocksdb(p, key, val, strlen(val)); + ASSERT(code == 0); + } + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "tbname_%d", i); + + char *val = NULL; + int32_t len = 0; + code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); + ASSERT(code == 0); + } + SArray *result = taosArrayInit(8, sizeof(void *)); + streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result); + ASSERT(taosArrayGetSize(result) >= 0); + + return p; + // streamStateClose((SStreamState *)p, true); +} +TEST_F(BackendEnv, checkOpen) { + SStreamState *p = (SStreamState *)backendOpen(); + int64_t tsStart = taosGetTimestampMs(); + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + char valBuf[256] = {0}; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + // do checkpoint 2 + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + char valBuf[256] = {0}; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3); + + const char *path = "/tmp/backend/stream"; + const char *dump = "/tmp/backend/stream/dump"; + // taosMkDir(dump); + taosMulMkDir(dump); + SBkdMgt *mgt = bkdMgtCreate((char *)path); + SArray *result = taosArrayInit(4, sizeof(void *)); + bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); + + bkdMgtDestroy(mgt); + streamStateClose((SStreamState *)p, true); + taosRemoveDir(path); +} + +TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; } + +typedef struct BdKV { + uint32_t k; + uint32_t v; +} BdKV; + +BdKV kvDict[] = {{0, 2}, {1, 2}, {15, 16}, {31, 32}, {56, 64}, {100, 128}, + {200, 256}, {500, 512}, {1000, 1024}, {2000, 2048}, {3000, 4096}}; + +TEST_F(BackendEnv, backendUtil) { + for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) { + ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v); + } +} + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 0dc2cc13f5..0caad479e5 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -25,46 +25,49 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" +// tsSnodeAddress = ""; +// tsS3StreamEnabled = 0; + +#include "cos.h" #include "rsync.h" #include "streamInt.h" -#include "cos.h" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); +// int main(int argc, char **argv) { +// testing::InitGoogleTest(&argc, argv); - if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { - printf("error"); - } - if (s3Init() < 0) { - return -1; - } - strcpy(tsSnodeAddress, "127.0.0.1"); - int ret = RUN_ALL_TESTS(); - s3CleanUp(); - return ret; -} +// if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { +// printf("error"); +// } +// if (s3Init() < 0) { +// return -1; +// } +// strcpy(tsSnodeAddress, "127.0.0.1"); +// int ret = RUN_ALL_TESTS(); +// s3CleanUp(); +// return ret; +// } TEST(testCase, checkpointUpload_Test) { - stopRsync(); - startRsync(); + // stopRsync(); + // startRsync(); taosSsleep(5); char* id = "2013892036"; - uploadCheckpoint(id, "/root/offset/"); + // uploadCheckpoint(id, "/root/offset/"); } TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - downloadCheckpoint(id, "/root/offset/download/"); + // downloadCheckpoint(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) { char* id = "2013892036"; - deleteCheckpoint(id); + // deleteCheckpoint(id); } TEST(testCase, checkpointDeleteFile_Test) { char* id = "2013892036"; - deleteCheckpointFile(id, "offset-ver0"); + // deleteCheckpointFile(id, "offset-ver0"); } diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 1b999e5fb0..59171876ff 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -14,10 +14,7 @@ class StreamStateEnv : public ::testing::Test { streamMetaInit(); backend = streamBackendInit(path, 0, 0); } - virtual void TearDown() { - streamMetaCleanup(); - // indexClose(index); - } + virtual void TearDown() { streamMetaCleanup(); } const char *path = TD_TMP_DIR_PATH "stream"; void *backend; @@ -50,6 +47,14 @@ bool equalSBF(SScalableBf *left, SScalableBf *right) { } TEST(TD_STREAM_UPDATE_TEST, update) { + const char *streamPath = "/tmp"; + + char *absPath = NULL; + void *p = NULL; + // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); + // p = taskDbOpen((char *)streamPath, (char *)"test", -1); + p = bkdMgtCreate((char *)streamPath); + // const int64_t interval = 20 * 1000; // const int64_t watermark = 10 * 60 * 1000; // SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e06ea70f70..7ff6116137 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1343,7 +1343,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; if (syncIsInit()) { - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { @@ -1415,8 +1415,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; if (syncIsInit()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, - &pSyncNode->pHeartbeatTimer); + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, + syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); @@ -2153,7 +2153,11 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex static void syncNodeEqPingTimer(void* param, void* tmrId) { if (!syncIsInit()) return; - SSyncNode* pNode = param; + int64_t rid = (int64_t)param; + SSyncNode* pNode = syncNodeAcquire(rid); + + if (pNode == NULL) return; + if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { SRpcMsg rpcMsg = {0}; int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), @@ -2173,7 +2177,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } _out: - taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer); + taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, + &pNode->pPingTimer); } } @@ -2224,7 +2229,11 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (!syncIsInit()) return; - SSyncNode* pNode = param; + int64_t rid = (int64_t)param; + SSyncNode* pNode = syncNodeAcquire(rid); + + if (pNode == NULL) return; + if (pNode->totalReplicaNum > 1) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { SRpcMsg rpcMsg = {0}; @@ -2245,7 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } _out: - taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager, + taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, &pNode->pHeartbeatTimer); } else { @@ -3385,4 +3394,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } -#endif \ No newline at end of file +#endif diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 3ee65f11dd..455128e6ec 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -316,7 +316,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) } // 3. Try to Recycle a page - if (!pPage && !pCache->lru.pLruPrev->isAnchor) { + if (!pPageH && !pPage && !pCache->lru.pLruPrev->isAnchor) { pPage = pCache->lru.pLruPrev; tdbPCacheRemovePageFromHash(pCache, pPage); tdbPCachePinPage(pCache, pPage); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 88eb51d500..d9686d77f8 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -89,12 +89,14 @@ static int32_t taosArrayResize(SArray* pArray) { int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) { if (newCap > pArray->capacity) { float factor = BOUNDARY_BIG_FACTOR; - if(newCap * pArray->elemSize > BOUNDARY_SIZE){ + if (newCap * pArray->elemSize > BOUNDARY_SIZE) { factor = BOUNDARY_SMALL_FACTOR; } + size_t tsize = (pArray->capacity * factor); while (newCap > tsize) { - tsize = (tsize * factor); + size_t newSize = (tsize * factor); + tsize = (newSize == tsize) ? (tsize + 2) : newSize; } pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e24f1d7c8e..2c0e4a9d91 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_NOT_EXIST, "view not exists in db //mnode-compact TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COMPACT_ID, "Invalid compact id") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST, "compact detail doesn't exist") // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 2cc13be6ba..7a4eb09b99 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -21,6 +21,40 @@ int64_t tsRpcQueueMemoryAllowed = 0; int64_t tsRpcQueueMemoryUsed = 0; +struct STaosQueue { + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FItem itemFp; + FItems itemsFp; + TdThreadMutex mutex; + int64_t memOfItems; + int32_t numOfItems; + int64_t threadId; + int64_t memLimit; + int64_t itemLimit; +}; + +struct STaosQset { + STaosQueue *head; + STaosQueue *current; + TdThreadMutex mutex; + tsem_t sem; + int32_t numOfQueues; + int32_t numOfItems; +}; + +struct STaosQall { + STaosQnode *current; + STaosQnode *start; + int32_t numOfItems; + int64_t memOfItems; + int32_t unAccessedNumOfItems; + int64_t unAccessMemOfItems; +}; + void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; } void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; } @@ -497,6 +531,12 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } +void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) { + pQueue->threadId = threadId; +} + +int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; } + #if 0 void taosResetQsetThread(STaosQset *qset, void *pItem) { diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 3b4975b701..7af794546b 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -20,6 +20,9 @@ #define DEFAULT_GROWTH 2 #define DEFAULT_TIGHTENING_RATIO 0.5 +#define DEFAULT_MAX_BLOOMFILTERS 4 +#define SBF_INVALID -1 +#define SBF_VALID 0 static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate); @@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { if (pSBf == NULL) { return NULL; } + pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; + pSBf->status = SBF_VALID; pSBf->numBits = 0; pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) { @@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { } int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } int32_t size = taosArrayGetSize(pSBf->bfArray); SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); ASSERT(pNormalBf); @@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le } int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); @@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { } int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); @@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 } static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) { + if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { + return NULL; + } + SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); if (pNormalBf == NULL) { return NULL; @@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { } if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1; + if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1; + if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1; return 0; } @@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { } if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; + if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; + if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; return pSBf; _error: diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 3e591c7d7f..138d4bc1f4 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -417,9 +417,9 @@ _OVER: return NULL; } else { while (worker->pid <= 0) taosMsleep(10); - queue->threadId = worker->pid; - uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, - queue->threadId); + + taosQueueSetThreadId(queue, worker->pid); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid); return queue; } } diff --git a/tests/army/community/cluster/incSnapshot.py b/tests/army/community/cluster/incSnapshot.py index 85f030eb03..dfd8d95c9c 100644 --- a/tests/army/community/cluster/incSnapshot.py +++ b/tests/army/community/cluster/incSnapshot.py @@ -46,7 +46,9 @@ class TDTestCase(TBase): # clusterDnodes.starttaosd(1) # time.sleep(5) autoGen.insert_data(5000, True) - tdSql.execute(f"flush database {self.db}") + self.flushDb(True) + # wait flush operation over + time.sleep(5) # sql = 'show vnodes;' # while True: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index bdccf33c32..62f2d10525 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1211,6 +1211,7 @@ ,,y,script,./test.sh -f tsim/stream/deleteState.sim ,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim ,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim +,,y,script,./test.sh -f tsim/stream/distributeMultiLevelInterval0.sim ,,y,script,./test.sh -f tsim/stream/distributeSession0.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/event0.sim diff --git a/tests/script/coverage_test.sh b/tests/script/coverage_test.sh index 01192763a1..9f526819c8 100644 --- a/tests/script/coverage_test.sh +++ b/tests/script/coverage_test.sh @@ -214,7 +214,7 @@ function lcovFunc { '*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c' '*/build_version.cc'\ '*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' \ '*/shellAuto.c' '*/shellTire.c' '*/shellCommand.c'\ - '*/sql.c' '*/sql.y'\ + '*/sql.c' '*/sql.y' '*/smaSnapshot.c' '*/smaCommit.c'\ --branch-coverage --function-coverage -o coverage.info # generate result diff --git a/tests/script/tsim/query/query_count0.sim b/tests/script/tsim/query/query_count0.sim index b7c629e538..5b95d4fad7 100644 --- a/tests/script/tsim/query/query_count0.sim +++ b/tests/script/tsim/query/query_count0.sim @@ -9,8 +9,6 @@ print =============== create database sql create database test vgroups 1; sql use test; -sql alter local 'disableCount' '0' ; - sql create table t1(ts timestamp, a int, b int , c int, d double); sql insert into t1 values(1648791213000,0,1,1,1.0); diff --git a/tests/script/tsim/query/query_count1.sim b/tests/script/tsim/query/query_count1.sim index 0c40303e57..0694ab062a 100644 --- a/tests/script/tsim/query/query_count1.sim +++ b/tests/script/tsim/query/query_count1.sim @@ -9,8 +9,6 @@ print =============== create database sql create database test vgroups 4; sql use test; -sql alter local 'disableCount' '0' ; - sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); diff --git a/tests/script/tsim/query/query_count_sliding0.sim b/tests/script/tsim/query/query_count_sliding0.sim index 13a6c94451..464aec6b97 100644 --- a/tests/script/tsim/query/query_count_sliding0.sim +++ b/tests/script/tsim/query/query_count_sliding0.sim @@ -9,8 +9,6 @@ print =============== create database sql create database test vgroups 1; sql use test; -sql alter local 'disableCount' '0' ; - sql create table t1(ts timestamp, a int, b int , c int, d double); sql insert into t1 values(1648791213000,0,1,1,1.0); diff --git a/tests/script/tsim/stream/distributeMultiLevelInterval0.sim b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim new file mode 100644 index 0000000000..784ab7f4a5 --- /dev/null +++ b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim @@ -0,0 +1,267 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/cfg.sh -n dnode1 -c streamAggCnt -v 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + + + +print ===== step1 +sql drop stream if exists streams1; +sql drop database if exists test; +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s); + +sleep 1000 + +sql insert into ts1 values(1648791213000,1,1,3,4.1); +sql insert into ts1 values(1648791223000,2,2,3,1.1); +sql insert into ts1 values(1648791233000,3,3,3,2.1); +sql insert into ts1 values(1648791243000,4,4,3,3.1); + +sql insert into ts2 values(1648791213000,1,5,3,4.1); +sql insert into ts2 values(1648791223000,2,6,3,1.1); +sql insert into ts2 values(1648791233000,3,7,3,2.1); +sql insert into ts2 values(1648791243000,4,8,3,3.1); + + +$loop_count = 0 +loop0: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 4 then + print =====rows=$rows + goto loop0 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop0 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop0 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop0 +endi + + +sql insert into ts1 values(1648791213000,1,9,3,4.1); + +$loop_count = 0 +loop1: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 4 then + print =====rows=$rows + goto loop1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop1 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop1 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop1 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop1 +endi + +sql delete from ts2 where ts = 1648791243000 ; + +$loop_count = 0 +loop2: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 4 then + print =====rows=$rows + goto loop2 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop2 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop2 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop2 +endi + +sql delete from ts2 where ts = 1648791223000 ; + +$loop_count = 0 +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 4 then + print =====rows=$rows + goto loop3 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop3 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop3 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop3 +endi + + +sql insert into ts1 values(1648791233001,3,9,3,2.1); + +$loop_count = 0 +loop4: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 4 then + print =====rows=$rows + goto loop4 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop4 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop4 +endi + +if $data21 != 3 then + print =====data21=$data21 + goto loop4 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop4 +endi + +sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); + + +print ===== over + +system sh/stop_dnodes.sh diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index b6917e5a76..847bbcf4be 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -206,6 +206,7 @@ SWords shellCommands[] = { {"show grants full;", 0, 0, NULL}, {"show grants logs;", 0, 0, NULL}, #ifdef TD_ENTERPRISE + {"show views;", 0, 0, NULL}, {"split vgroup ", 0, 0, NULL}, #endif {"insert into values(", 0, 0, NULL}, @@ -570,6 +571,7 @@ void showHelp() { split vgroup ;\n\ show compacts;\n\ show compact \n\ + show views;\n\ show create view ;"); #endif diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 23424cea98..995d3d04ec 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -75,7 +75,7 @@ bool shellIsEmptyCommand(const char *cmd) { int32_t shellRunSingleCommand(char *command) { shellCmdkilled = false; - + if (shellIsEmptyCommand(command)) { return 0; } @@ -1019,7 +1019,7 @@ void shellReadHistory() { char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1); int32_t read_size = 0; - while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) { + while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) > 0) { line[read_size - 1] = '\0'; taosMemoryFree(pHistory->hist[pHistory->hend]); pHistory->hist[pHistory->hend] = taosStrdup(line); @@ -1315,7 +1315,7 @@ int32_t shellExecute() { shellSetConn(shell.conn, runOnce); shellReadHistory(); - if(shell.args.is_bi_mode) { + if(shell.args.is_bi_mode) { // need set bi mode printf("Set BI mode is true.\n"); #ifndef WEBSOCKET diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 2c334eb67b..01619decc5 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1018,7 +1018,7 @@ int sml_escape_Test() { ASSERT(numFields == 5); ASSERT(strncmp(fields[1].name, "inode\"i,= s_used", sizeof("inode\"i,= s_used") - 1) == 0); ASSERT(strncmp(fields[2].name, "total", sizeof("total") - 1) == 0); - ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\\\ree", sizeof("inode\"i,= s_f\\\\ree") - 1) == 0); + ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\ree", sizeof("inode\"i,= s_f\\ree") - 1) == 0); ASSERT(strncmp(fields[4].name, "dev\"i,= ce", sizeof("dev\"i,= ce") - 1) == 0); TAOS_ROW row = NULL; @@ -1044,6 +1044,88 @@ int sml_escape_Test() { return code; } +// test field with end of escape +int sml_escape1_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "create database if not exists db_escape"); + taos_free_result(pRes); + + pRes = taos_query(taos, "use db_escape"); + taos_free_result(pRes); + + const char *sql[] = { + "stab,t1\\=1 c1=3,c2=\"32fw\" 1661943970000000000", + "stab,t1=1\\ c1=3,c2=\"32fw\" 1661943980000000000", + "stab,t1=1 c1\\=3,c2=\"32fw\" 1661943990000000000", + }; + for(int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++){ + pRes = taos_schemaless_insert(taos, (char**)&sql[i], 1, TSDB_SML_LINE_PROTOCOL, 0); + int code = taos_errno(pRes); + ASSERT(code); + } + + const char *sql1[] = { + "stab\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000", + "stab\\\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000", + "stab,t1\\\\=1 c1=3,c2=\"32fw\" 1661943970000000000", + "stab,t1=1\\\\ c1=3,c2=\"32fw\" 1661943980000000000", + "stab,t1=1 c1\\\\=3,c2=\"32fw\" 1661943990000000000", + }; + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, 0); + printf("%s result:%s, rows:%d\n", __FUNCTION__, taos_errstr(pRes), taos_affected_rows(pRes)); + int code = taos_errno(pRes); + ASSERT(!code); + ASSERT(taos_affected_rows(pRes) == 5); + taos_free_result(pRes); + + pRes = taos_query(taos, "select * from stab"); //check stable name + ASSERT(pRes); + int fieldNum = taos_field_count(pRes); + ASSERT(fieldNum == 6); + printf("fieldNum:%d\n", fieldNum); + + int numFields = taos_num_fields(pRes); + TAOS_FIELD *fields = taos_fetch_fields(pRes); + ASSERT(numFields == 6); + ASSERT(strncmp(fields[1].name, "c1", sizeof("c1") - 1) == 0); + ASSERT(strncmp(fields[2].name, "c2", sizeof("c2") - 1) == 0); + ASSERT(strncmp(fields[3].name, "c1\\", sizeof("c1\\") - 1) == 0); + ASSERT(strncmp(fields[4].name, "t1\\", sizeof("t1\\") - 1) == 0); + ASSERT(strncmp(fields[5].name, "t1", sizeof("t1") - 1) == 0); + + TAOS_ROW row = NULL; + int32_t rowIndex = 0; + while ((row = taos_fetch_row(pRes)) != NULL) { + int64_t ts = *(int64_t *)row[0]; + + if (ts == 1661943970000) { + ASSERT(*(double *)row[1] == 3); + ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0); + ASSERT(row[3] == NULL); + ASSERT(strncmp(row[4], "1", sizeof("1") - 1) == 0); + ASSERT(row[5] == NULL); + }else if (ts == 1661943980000) { + ASSERT(*(double *)row[1] == 3); + ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0); + ASSERT(row[3] == NULL); + ASSERT(row[4] == NULL); + ASSERT(strncmp(row[5], "1\\", sizeof("1\\") - 1) == 0); + }else if (ts == 1661943990000) { + ASSERT(row[1] == NULL); + ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0); + ASSERT(*(double *)row[3] == 3); + ASSERT(row[4] == NULL); + ASSERT(strncmp(row[5], "1", sizeof("1") - 1) == 0); + } + rowIndex++; + } + taos_free_result(pRes); + taos_close(taos); + + return code; +} + int sml_19221_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -1775,17 +1857,14 @@ int main(int argc, char *argv[]) { ASSERT(ret); ret = sml_escape_Test(); ASSERT(!ret); + ret = sml_escape1_Test(); + ASSERT(!ret); ret = sml_ts3116_Test(); ASSERT(!ret); ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file ASSERT(!ret); ret = sml_ts3303_Test(); ASSERT(!ret); - - // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ - // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); - // } - // int ret = 0; ret = sml_ttl_Test(); ASSERT(!ret); ret = sml_ts2164_Test();