Merge remote-tracking branch 'origin/3.0' into feat/TD-27337

This commit is contained in:
dapan1121 2024-02-29 17:55:17 +08:00
commit e17518e07f
56 changed files with 1218 additions and 265 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -219,7 +219,6 @@ extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
extern int32_t tsResolveFQDNRetryTime;
extern bool tsDisableCount;
extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
@ -234,7 +233,7 @@ int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer);
struct SConfig *taosGetCfg();
void taosSetAllDebugFlag(int32_t flag);
void taosSetGlobalDebugFlag(int32_t flag);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
int8_t taosGranted(int8_t type);

View File

@ -433,7 +433,7 @@ int32_t* taosGetErrno();
//mnode-compact
#define TSDB_CODE_MND_INVALID_COMPACT_ID TAOS_DEF_ERROR_CODE(0, 0x04B1)
#define TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x04B2)
// vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x

View File

@ -72,40 +72,6 @@ struct STaosQnode {
char item[];
};
struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
};
struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
};
struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
int64_t memOfItems;
int32_t unAccessedNumOfItems;
int64_t unAccessMemOfItems;
};
STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
@ -140,6 +106,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
void taosResetQsetThread(STaosQset *qset, void *pItem);
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId);
int64_t taosQueueGetThreadId(STaosQueue *pQueue);
#ifdef __cplusplus
}

View File

@ -26,6 +26,8 @@ typedef struct SScalableBf {
SArray *bfArray; // array of bloom filters
uint32_t growth;
uint64_t numBits;
uint32_t maxBloomFilters;
int8_t status;
_hash_fn_t hashFn1;
_hash_fn_t hashFn2;
} SScalableBf;

View File

@ -80,7 +80,7 @@ extern "C" {
#define IS_SAME_KEY (maxKV->type == kv->type && maxKV->keyLen == kv->keyLen && memcmp(maxKV->key, kv->key, kv->keyLen) == 0)
#define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE))
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == SLASH))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))

View File

@ -20,14 +20,14 @@
#include "clientSml.h"
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
#define IS_COMMA(sql,escapeChar) (*(sql) == COMMA && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_SPACE(sql,escapeChar) (*(sql) == SPACE && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_EQUAL(sql,escapeChar) (*(sql) == EQUAL && (*((sql)-1) != SLASH || ((sql)-1 == escapeChar)))
#define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) (*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH))
#define IS_SLASH_LETTER_IN_TAG_FIELD_KEY(sql) \
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL))
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == SLASH))
#define PROCESS_SLASH_IN_FIELD_VALUE(key, keyLen) \
for (int i = 1; i < keyLen; ++i) { \
@ -198,7 +198,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
int cnt = 0;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) {
if (unlikely(IS_SPACE(*sql,NULL))) {
break;
}
@ -207,18 +207,21 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
size_t keyLen = 0;
bool keyEscaped = false;
size_t keyLenEscaped = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
}
if (unlikely(IS_EQUAL(*sql))) {
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
keyLen = *sql - key;
(*sql)++;
break;
}
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
keyLenEscaped++;
keyEscaped = true;
}
@ -238,15 +241,16 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
size_t valueLenEscaped = 0;
while (*sql < sqlEnd) {
// parse value
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
break;
} else if (unlikely(IS_EQUAL(*sql))) {
} else if (unlikely(IS_EQUAL(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
terrno = TSDB_CODE_SML_INVALID_DATA;
return -1;
}
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
valueLenEscaped++;
valueEscaped = true;
}
@ -293,7 +297,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
}
cnt++;
if (IS_SPACE(*sql)) {
if (IS_SPACE(*sql,escapeChar)) {
break;
}
(*sql)++;
@ -326,7 +330,7 @@ static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {
int cnt = 0;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) {
if (unlikely(IS_SPACE(*sql,NULL))) {
break;
}
@ -335,17 +339,19 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
size_t keyLen = 0;
bool keyEscaped = false;
size_t keyLenEscaped = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(IS_EQUAL(*sql))) {
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
keyLen = *sql - key;
(*sql)++;
break;
}
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
escapeChar = *sql;
keyLenEscaped++;
keyEscaped = true;
}
@ -363,7 +369,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
bool valueEscaped = false;
size_t valueLenEscaped = 0;
int quoteNum = 0;
const char *escapeChar = NULL;
while (*sql < sqlEnd) {
// parse value
if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) {
@ -374,7 +379,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
continue;
}
if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql)))) {
if (quoteNum % 2 == 0 && (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar)))) {
break;
}
if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) {
@ -437,7 +442,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
cnt++;
if (IS_SPACE(*sql)) {
if (IS_SPACE(*sql,escapeChar)) {
break;
}
(*sql)++;
@ -453,19 +458,18 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->measure = sql;
// parse measure
size_t measureLenEscaped = 0;
const char *escapeChar = NULL;
while (sql < sqlEnd) {
if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) {
elements->measureEscaped = true;
measureLenEscaped++;
sql++;
continue;
}
if (unlikely(IS_COMMA(sql))) {
if (unlikely(IS_COMMA(sql,escapeChar) || IS_SPACE(sql,escapeChar))) {
break;
}
if (unlikely(IS_SPACE(sql))) {
break;
if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) {
elements->measureEscaped = true;
escapeChar = sql;
measureLenEscaped++;
sql++;
continue;
}
sql++;
}
@ -478,9 +482,12 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
// to get measureTagsLen before
const char *tmp = sql;
while (tmp < sqlEnd) {
if (unlikely(IS_SPACE(tmp))) {
if (unlikely(IS_SPACE(tmp,escapeChar))) {
break;
}
if(unlikely(IS_SLASH_LETTER_IN_TAG_FIELD_KEY(tmp))){
escapeChar = tmp;
}
tmp++;
}
elements->measureTagsLen = tmp - elements->measure;

View File

@ -876,12 +876,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTmq->delayedTask, qall);
if (qall->numOfItems == 0) {
int32_t numOfItems = taosQallItemSize(qall);
if (numOfItems == 0) {
taosFreeQall(qall);
return TSDB_CODE_SUCCESS;
}
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
@ -1839,7 +1840,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
while (1) {
SMqRspWrapper* pRspWrapper = NULL;

View File

@ -269,7 +269,6 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024;
bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 1000;
bool tsDisableCount = true;
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
@ -541,8 +540,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "disableCount", tsDisableCount, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
return 0;
}
@ -1109,8 +1106,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
tsDisableCount = cfgGetItem(pCfg, "disableCount")->bval;
return 0;
}
@ -1263,6 +1258,8 @@ static int32_t taosSetReleaseCfg(SConfig *pCfg) { return 0; }
int32_t taosSetReleaseCfg(SConfig *pCfg);
#endif
static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag);
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
if (tsCfg == NULL) osDefaultInit();
@ -1307,7 +1304,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
taosSetServerLogCfg(pCfg);
}
taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32);
taosSetAllDebugFlag(pCfg, cfgGetItem(pCfg, "debugFlag")->i32);
if (taosMulModeMkDir(tsLogDir, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
@ -1385,7 +1382,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
taosSetSystemCfg(tsCfg);
if (taosSetFileHandlesLimit() != 0) return -1;
taosSetAllDebugFlag(cfgGetItem(tsCfg, "debugFlag")->i32);
taosSetAllDebugFlag(tsCfg, cfgGetItem(tsCfg, "debugFlag")->i32);
cfgDumpCfg(tsCfg, tsc, false);
@ -1478,7 +1475,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
}
if (strncasecmp(name, "debugFlag", 9) == 0) {
taosSetAllDebugFlag(pItem->i32);
taosSetAllDebugFlag(pCfg, pItem->i32);
return 0;
}
@ -1552,7 +1549,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
switch (lowcaseName[0]) {
case 'd': {
if (strcasecmp("debugFlag", name) == 0) {
taosSetAllDebugFlag(pItem->i32);
taosSetAllDebugFlag(pCfg, pItem->i32);
matched = true;
}
break;
@ -1737,8 +1734,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"shellActivityTimer", &tsShellActivityTimer},
{"slowLogThreshold", &tsSlowLogThreshold},
{"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental},
{"disableCount", &tsDisableCount}};
{"experimental", &tsExperimental}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1777,11 +1773,13 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag
taosSetDebugFlag(pFlagPtr, name, flag);
}
void taosSetAllDebugFlag(int32_t flag) {
void taosSetGlobalDebugFlag(int32_t flag) { taosSetAllDebugFlag(tsCfg, flag); }
static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) {
if (flag <= 0) return;
SArray *noNeedToSetVars = NULL;
SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag");
SConfigItem *pItem = cfgGetItem(pCfg, "debugFlag");
if (pItem != NULL) {
pItem->i32 = flag;
noNeedToSetVars = pItem->array;

View File

@ -5238,11 +5238,11 @@ int32_t tDeserializeSQueryCompactProgressRsp(void *buf, int32_t bufLen, SQueryCo
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->finished) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -2;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -3;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -4;
if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -5;
if (tDecodeI32(&decoder, &pReq->finished) < 0) return -6;
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -68,7 +68,7 @@ static struct {
int64_t startTime;
} global = {0};
static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143); }
static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetGlobalDebugFlag(143); }
static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert = 1; }
static void dmStopDnode(int signum, void *sigInfo, void *context) {

View File

@ -194,26 +194,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
while (pVnode->refCount > 0) taosMsleep(10);
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId);
taosQueueGetThreadId(pVnode->pWriteW.queue));
tMultiWorkerCleanup(&pVnode->pWriteW);
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId);
taosQueueGetThreadId(pVnode->pSyncW.queue));
tMultiWorkerCleanup(&pVnode->pSyncW);
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId);
taosQueueGetThreadId(pVnode->pSyncRdW.queue));
tMultiWorkerCleanup(&pVnode->pSyncRdW);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
taosQueueGetThreadId(pVnode->pApplyW.queue));
tMultiWorkerCleanup(&pVnode->pApplyW);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId);
taosQueueGetThreadId(pVnode->pFetchQ));
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq);

View File

@ -365,16 +365,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
}
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteW.queue->threadId);
taosQueueGetThreadId(pVnode->pWriteW.queue));
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId);
taosQueueGetThreadId(pVnode->pSyncW.queue));
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId);
taosQueueGetThreadId(pVnode->pSyncRdW.queue));
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
taosQueueGetThreadId(pVnode->pApplyW.queue));
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId);
taosQueueGetThreadId(pVnode->pFetchQ));
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
return 0;
}

View File

@ -14,6 +14,10 @@ IF (TD_STORAGE)
ENDIF ()
IF (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
ENDIF()
target_include_directories(
dnode
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"

View File

@ -26,6 +26,10 @@ target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor executor qworker stream parser audit monitorfw
)
IF (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(mnode grant)
ADD_DEFINITIONS(-D_GRANT)

View File

@ -240,7 +240,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac
SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
@ -363,13 +363,15 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p
}
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "kill-compact");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
if (pTrans == NULL) {
mError("compact:%" PRId32 ", failed to drop since %s" , pCompact->compactId, terrstr());
return -1;
}
mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
@ -488,13 +490,17 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c
sdbRelease(pMnode->pSdb, pDetail);
}
return -1;
return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
}
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
SQueryCompactProgressRsp req = {0};
if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) {
int32_t code = 0;
code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
if (code != 0) {
terrno = TSDB_CODE_INVALID_MSG;
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d",
code, pReq->pCont, pReq->contLen);
return -1;
}
@ -502,10 +508,10 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
if(mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req) != 0){
code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
if(code != 0){
terrno = code;
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
return -1;
@ -612,7 +618,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
return 0;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-compact-progress");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
if (pTrans == NULL) {
mError("trans:%" PRId32 ", failed to create since %s" , pTrans->id, terrstr());
return -1;
@ -621,6 +627,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
mndTransSetDbName(pTrans, pCompact->dbname, NULL);
pIter = NULL;
while (1) {
SCompactDetailObj *pDetail = NULL;

View File

@ -484,13 +484,16 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%"PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%"PRId32, pVnode->config.vgId);
if(pVnode->monitor.insertCounter == NULL){
if(tsEnableMonitor && pVnode->monitor.insertCounter == NULL){
taos_counter_t *counter = NULL;
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
if(counter == NULL){
int32_t label_count = 7;
const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID,
VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP,
VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME,
VNODE_METRIC_TAG_NAME_RESULT};
taos_counter_t *counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql",
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql",
label_count, sample_labels);
vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter);
if(taos_collector_registry_register_metric(counter) == 1){
@ -498,6 +501,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT);
vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter);
}
}
pVnode->monitor.insertCounter = counter;
vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter);
}

View File

@ -16,6 +16,7 @@
#include "audit.h"
#include "cos.h"
#include "tencode.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tstrbuild.h"
#include "vnd.h"
@ -1708,7 +1709,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if(pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId,
pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
pOriginalMsg->info.conn.user, "Success"};

View File

@ -94,10 +94,10 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pBlock->info.rows;) {
int32_t step = pInfo->windowSliding;
SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow);
int32_t prevRows = pBuffInfo->winRows;
int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
int32_t step = num;
if (prevRows == 0) {
pInfo->pRow->win.skey = tsCols[i];
}
@ -118,6 +118,8 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (prevRows <= pInfo->windowSliding) {
if (pBuffInfo->winRows > pInfo->windowSliding) {
step = pInfo->windowSliding - prevRows;
} else {
step = pInfo->windowSliding;
}
} else {
step = 0;

View File

@ -1009,6 +1009,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData);
}
// iterate operator tree

View File

@ -3763,8 +3763,9 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
taosMemoryFree(pSubTblsInfo);
pInfo->pSubTablesMergeInfo = NULL;
}
taosMemoryTrim(0);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -76,8 +76,8 @@ char* idxInt2str(int64_t val, char* dst, int radix) {
return dst - 1;
}
__compar_fn_t idxGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY ||
type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR ||
type == TSDB_DATA_TYPE_GEOMETRY) {
return (__compar_fn_t)strcmp;
}
return getComparFunc(type, 0);
@ -108,8 +108,8 @@ static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) {
return tCompare(func, QUERY_TERM, a, b, type);
}
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR ||
dtype == TSDB_DATA_TYPE_VARBINARY || dtype == TSDB_DATA_TYPE_GEOMETRY) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY ||
dtype == TSDB_DATA_TYPE_GEOMETRY) {
return tDoCompare(func, cmptype, a, b);
}
#if 1
@ -290,6 +290,7 @@ int idxUidCompare(const void* a, const void* b) {
uint64_t r = *(uint64_t*)b;
return l - r;
}
#ifdef BUILD_NO_CALL
int32_t idxConvertData(void* src, int8_t type, void** dst) {
int tlen = -1;
switch (type) {
@ -372,6 +373,8 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
// indexMayFillNumbericData(*dst, tlen);
return tlen;
}
#endif
int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
if (src == NULL) {
*dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR));

View File

@ -216,7 +216,7 @@ int taos_collector_registry_validate_metric_name(taos_collector_registry_t *self
regfree(&r);
return 0;
}
/*
const char *taos_collector_registry_bridge(taos_collector_registry_t *self, char *ts, char *format) {
taos_metric_formatter_clear(self->metric_formatter);
taos_metric_formatter_load_metrics(self->metric_formatter, self->collectors, ts, format);
@ -229,7 +229,7 @@ const char *taos_collector_registry_bridge(taos_collector_registry_t *self, char
return taos_string_builder_str(self->string_builder_batch);
}
*/
int taos_collector_registry_clear_batch(taos_collector_registry_t *self){
return taos_string_builder_clear(self->string_builder_batch);
}

View File

@ -38,7 +38,7 @@ int taos_gauge_destroy(taos_gauge_t *self) {
self = NULL;
return r;
}
/*
int taos_gauge_inc(taos_gauge_t *self, const char **label_values) {
TAOS_ASSERT(self != NULL);
if (self == NULL) return 1;
@ -86,7 +86,7 @@ int taos_gauge_sub(taos_gauge_t *self, double r_value, const char **label_values
if (sample == NULL) return 1;
return taos_metric_sample_sub(sample, r_value);
}
*/
int taos_gauge_set(taos_gauge_t *self, double r_value, const char **label_values) {
TAOS_ASSERT(self != NULL);
if (self == NULL) return 1;

View File

@ -63,7 +63,7 @@ int taos_metric_formatter_destroy(taos_metric_formatter_t *self) {
self = NULL;
return ret;
}
/*
int taos_metric_formatter_load_help(taos_metric_formatter_t *self, const char *name, const char *help) {
TAOS_ASSERT(self != NULL);
if (self == NULL) return 1;
@ -105,7 +105,7 @@ int taos_metric_formatter_load_type(taos_metric_formatter_t *self, const char *n
return taos_string_builder_add_char(self->string_builder, '\n');
}
*/
int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char *name, const char *suffix,
size_t label_count, const char **label_keys, const char **label_values) {
TAOS_ASSERT(self != NULL);
@ -156,7 +156,7 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char
}
return 0;
}
/*
int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample,
char *ts, char *format) {
TAOS_ASSERT(self != NULL);
@ -185,7 +185,7 @@ int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric
return taos_string_builder_add_char(self->string_builder, '\n');
}
*/
int taos_metric_formatter_clear(taos_metric_formatter_t *self) {
TAOS_ASSERT(self != NULL);
return taos_string_builder_clear(self->string_builder);
@ -204,7 +204,7 @@ char *taos_metric_formatter_dump(taos_metric_formatter_t *self) {
}
return data;
}
/*
int taos_metric_formatter_load_metric(taos_metric_formatter_t *self, taos_metric_t *metric, char *ts, char *format) {
TAOS_ASSERT(self != NULL);
if (self == NULL) return 1;
@ -255,3 +255,4 @@ int taos_metric_formatter_load_metrics(taos_metric_formatter_t *self, taos_map_t
}
return r;
}
*/

View File

@ -91,6 +91,7 @@ int taos_metric_sample_add(taos_metric_sample_t *self, double r_value) {
return 0;
}
/*
int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) {
TAOS_ASSERT(self != NULL);
if (self->type != TAOS_GAUGE) {
@ -99,7 +100,8 @@ int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) {
}
#ifdef C11_ATOMIC
/*_Atomic*/ double old = atomic_load(&self->r_value);
///_Atomic/
double old = atomic_load(&self->r_value);
for (;;) {
_Atomic double new = ATOMIC_VAR_INIT(old - r_value);
if (atomic_compare_exchange_weak(&self->r_value, &old, new)) {
@ -116,6 +118,7 @@ int taos_metric_sample_sub(taos_metric_sample_t *self, double r_value) {
return 0;
}
*/
int taos_metric_sample_set(taos_metric_sample_t *self, double r_value) {
if (self->type != TAOS_GAUGE && self->type != TAOS_COUNTER) {

View File

@ -1099,10 +1099,6 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!pCxt->pPlanCxt->streamQuery && tsDisableCount) {
return TSDB_CODE_FAILED;
}
pWindow->winType = WINDOW_TYPE_COUNT;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder =

View File

@ -17,10 +17,14 @@
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "rocksdb/c.h"
//#include "streamInt.h"
// #include "streamInt.h"
#include "streamState.h"
#include "tcommon.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SCfComparator {
rocksdb_comparator_t** comp;
int32_t numOfComp;
@ -244,11 +248,6 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
// void taskDbDestroy(void* pDb, bool flush);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path);
@ -258,4 +257,10 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
uint32_t nextPow2(uint32_t x);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -2788,7 +2788,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur;
}
#ifdef BUILD_NO_CALL
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateGetCur_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
@ -2838,7 +2837,6 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
#endif
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
@ -3432,7 +3430,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
SSessionKey tmpKey = *key;
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
// tdbRealloc(NULL, valSize);
if (!tmp) {
return -1;
}
@ -3506,13 +3503,11 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return code;
}
#ifdef BUILD_NO_CALL
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
return code;
}
#endif
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
@ -3535,10 +3530,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
if (pIter == NULL) {
return -1;
}
size_t klen = 0;
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
const char* key = rocksdb_iter_key(pIter, &klen);
int32_t vlen = 0;
const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
char* val = NULL;
@ -3700,6 +3695,8 @@ uint32_t nextPow2(uint32_t x) {
x = x | (x >> 16);
return x + 1;
}
#ifdef BUILD_NO_CALL
int32_t copyFiles(const char* src, const char* dst) {
int32_t code = 0;
// opt later, just hard link
@ -3739,6 +3736,7 @@ _err:
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
#endif
int32_t isBkdDataMeta(char* name, int32_t len) {
const char* pCurrent = "CURRENT";

View File

@ -70,7 +70,7 @@ static void streamMetaEnvInit() {
streamTimerInit();
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
@ -1104,14 +1104,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0;
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
@ -1172,7 +1172,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
_end:
_end:
streamMetaClearHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}
@ -1304,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}
@ -1395,7 +1395,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
pMeta->sendMsgBeforeClosing = true;
}
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
streamMetaWUnLock(pMeta);
if (isLeader) {
@ -1531,7 +1531,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) {
for (int32_t i = 0; i < num; ++i) {
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
if (ppTask == NULL) {

View File

@ -22,7 +22,7 @@
#define DEFAULT_MAP_CAPACITY 131072
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100)
#define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 100000
#define MAX_NUM_SCALABLE_BF 64
#define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= adjInterval) {
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
}
if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}
return watermark;

View File

@ -1,40 +1,104 @@
MESSAGE(STATUS "build stream unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# bloomFilterTest
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(streamUpdateTest
PUBLIC os util common gtest gtest_main stream executor index
#TARGET_LINK_LIBRARIES(streamUpdateTest
#PUBLIC os util common gtest gtest_main stream executor index
#)
#TARGET_INCLUDE_DIRECTORIES(
#streamUpdateTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
#TARGET_LINK_LIBRARIES(
#checkpointTest
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#checkpointTest
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_executable(backendTest "")
#target_sources(backendTest
#PRIVATE
#"backendTest.cpp"
#)
#TARGET_LINK_LIBRARIES(
#backendTest
#PUBLIC rocksdb
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#backendTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_test(
#NAME streamUpdateTest
#COMMAND streamUpdateTest
#)
#add_test(
#NAME checkpointTest
#COMMAND checkpointTest
#)
#add_test(
#NAME backendTest
#COMMAND backendTest
#)
#add_executable(backendTest "")
#target_sources(backendTest
#PUBLIC
#"backendTest.cpp"
#)
#target_include_directories(
#backendTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#target_link_libraries(
#backendTest
#PUBLIC rocksdb
#PUBLIC os common gtest stream executor qcom index transport util
#)
MESSAGE(STATUS "build parser unit test")
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(backendTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
backendTest
PUBLIC rocksdb
PUBLIC os common gtest stream executor qcom index transport util vnode
)
TARGET_INCLUDE_DIRECTORIES(
streamUpdateTest
TARGET_INCLUDE_DIRECTORIES(
backendTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
)
ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
TARGET_LINK_LIBRARIES(
checkpointTest
PUBLIC os common gtest stream executor qcom index transport util
)
TARGET_INCLUDE_DIRECTORIES(
checkpointTest
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test(
NAME streamUpdateTest
COMMAND streamUpdateTest
)
add_test(
NAME checkpointTest
COMMAND checkpointTest
)
ADD_TEST(
NAME backendTest
COMMAND backendTest
)
ENDIF ()

View File

@ -0,0 +1,437 @@
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <vector>
#include "streamBackendRocksdb.h"
#include "streamSnapshot.h"
#include "streamState.h"
#include "tstream.h"
#include "tstreamFileState.h"
#include "tstreamUpdate.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
class BackendEnv : public ::testing::Test {
protected:
virtual void SetUp() {}
virtual void TearDown() {}
};
void *backendCreate() {
const char *streamPath = "/tmp";
void *p = NULL;
// char *absPath = NULL;
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
// STaskDbWrapper *p = taskDbOpen((char *)streamPath, (char *)"stream-backend", -1);
// ASSERT(p != NULL);
return p;
}
SStreamState *stateCreate(const char *path) {
SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask));
pTask->ver = 1024;
pTask->id.streamId = 1023;
pTask->id.taskId = 1111111;
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL);
pTask->pMeta = pMeta;
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
ASSERT(p != NULL);
return p;
}
void *backendOpen() {
streamMetaInit();
const char *path = "/tmp/backend";
SStreamState *p = stateCreate(path);
ASSERT(p != NULL);
// write bacth
// default/state/fill/sess/func/parname/partag
int32_t size = 100;
std::vector<int64_t> tsArray;
for (int32_t i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
SWinKey key; // = {.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
tsArray.push_back(ts);
}
for (int32_t i = 0; i < size; i++) {
int64_t ts = tsArray[i];
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t len = 0;
char *newVal = NULL;
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(len == strlen(val));
}
int64_t ts = tsArray[0];
SWinKey key = {0}; // {.groupId = (uint64_t)(0), .ts = ts};
key.groupId = (uint64_t)(0);
key.ts = ts;
streamStateDel_rocksdb(p, &key);
streamStateClear_rocksdb(p);
for (int i = 0; i < size; i++) {
int64_t ts = tsArray[i];
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t len = 0;
char *newVal = NULL;
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code != 0);
}
tsArray.clear();
for (int i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
tsArray.push_back(ts);
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
}
SWinKey winkey;
int32_t code = streamStateGetFirst_rocksdb(p, &key);
ASSERT(code == 0);
ASSERT(key.ts == tsArray[0]);
SStreamStateCur *pCurr = streamStateSeekToLast_rocksdb(p);
ASSERT(pCurr != NULL);
streamStateFreeCur(pCurr);
winkey.groupId = 0;
winkey.ts = tsArray[0];
char *val = NULL;
int32_t len = 0;
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
ASSERT(pCurr != NULL);
streamStateFreeCur(pCurr);
tsArray.clear();
for (int i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
tsArray.push_back(ts);
STupleKey key = {0};
key.groupId = (uint64_t)(0); //= {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i};
key.ts = ts;
key.exprIdx = i;
const char *val = "Value";
int32_t len = strlen(val);
streamStateFuncPut_rocksdb(p, &key, val, len);
}
for (int i = 0; i < size; i++) {
STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
key.groupId = (uint64_t)(0);
key.ts = tsArray[i];
key.exprIdx = i;
char *val = NULL;
int32_t len = 0;
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
ASSERT(len == strlen("Value"));
}
for (int i = 0; i < size; i++) {
STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
key.groupId = (uint64_t)(0);
key.ts = tsArray[i];
key.exprIdx = i;
char *val = NULL;
int32_t len = 0;
streamStateFuncDel_rocksdb(p, &key);
}
// session put
tsArray.clear();
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)};
key.win.skey = i;
key.win.ekey = i;
key.groupId = (uint64_t)(0);
tsArray.push_back(i);
const char *val = "Value";
int32_t len = strlen(val);
streamStateSessionPut_rocksdb(p, &key, val, len);
char *pval = NULL;
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
ASSERT(strncmp(pval, val, len) == 0);
}
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
key.win.skey = tsArray[i];
key.win.ekey = tsArray[i];
key.groupId = (uint64_t)(0);
const char *val = "Value";
int32_t len = strlen(val);
char *pval = NULL;
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
ASSERT(strncmp(pval, val, len) == 0);
taosMemoryFreeClear(pval);
}
pCurr = streamStateSessionSeekToLast_rocksdb(p, 0);
ASSERT(pCurr != NULL);
{
SSessionKey key;
memset(&key, 0, sizeof(key));
char *val = NULL;
int32_t vlen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]);
pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(vlen == strlen("Value"));
ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]);
ASSERT(0 == streamStateSessionAddIfNotExist_rocksdb(p, &key, 10, (void **)&val, &len));
ASSERT(0 ==
streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len));
}
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
key.win.skey = tsArray[i];
key.win.ekey = tsArray[i];
key.groupId = (uint64_t)(0);
const char *val = "Value";
int32_t len = strlen(val);
char *pval = NULL;
ASSERT(0 == streamStateSessionDel_rocksdb(p, &key));
}
for (int i = 0; i < size; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
const char *val = "Value";
int32_t vlen = strlen(val);
ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0);
}
for (int i = 0; i < size; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
char *val = NULL;
int32_t vlen = 0;
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
taosMemoryFreeClear(val);
}
{
SWinKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[0]};
key.groupId = (uint64_t)(0);
key.ts = tsArray[0];
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
ASSERT(pCurr != NULL);
char *val = NULL;
int32_t vlen = 0;
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value"));
streamStateFreeCur(pCurr);
pCurr = streamStateFillSeekKeyNext_rocksdb(p, &key);
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value") && key.groupId == 1 && key.ts == tsArray[1]);
key.groupId = 1;
key.ts = tsArray[1];
pCurr = streamStateFillSeekKeyPrev_rocksdb(p, &key);
ASSERT(pCurr != NULL);
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value") && key.groupId == 0 && key.ts == tsArray[0]);
}
for (int i = 0; i < size - 1; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
char *val = NULL;
int32_t vlen = 0;
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
taosMemoryFreeClear(val);
}
streamStateSessionClear_rocksdb(p);
for (int i = 0; i < size; i++) {
char tbname[TSDB_TABLE_NAME_LEN] = {0};
sprintf(tbname, "%s_%d", "tbname", i);
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
}
for (int i = 0; i < size; i++) {
char *val = NULL;
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
taosMemoryFree(val);
}
for (int i = 0; i < size; i++) {
char tbname[TSDB_TABLE_NAME_LEN] = {0};
sprintf(tbname, "%s_%d", "tbname", i);
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
}
for (int i = 0; i < size; i++) {
char *val = NULL;
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
taosMemoryFree(val);
}
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "tbname_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
code = streamDefaultPut_rocksdb(p, key, val, strlen(val));
ASSERT(code == 0);
}
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "tbname_%d", i);
char *val = NULL;
int32_t len = 0;
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
ASSERT(code == 0);
}
SArray *result = taosArrayInit(8, sizeof(void *));
streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result);
ASSERT(taosArrayGetSize(result) >= 0);
return p;
// streamStateClose((SStreamState *)p, true);
}
TEST_F(BackendEnv, checkOpen) {
SStreamState *p = (SStreamState *)backendOpen();
int64_t tsStart = taosGetTimestampMs();
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
char valBuf[256] = {0};
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
// do checkpoint 2
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
char valBuf[256] = {0};
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3);
const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump);
taosMulMkDir(dump);
SBkdMgt *mgt = bkdMgtCreate((char *)path);
SArray *result = taosArrayInit(4, sizeof(void *));
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
bkdMgtDestroy(mgt);
streamStateClose((SStreamState *)p, true);
taosRemoveDir(path);
}
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
typedef struct BdKV {
uint32_t k;
uint32_t v;
} BdKV;
BdKV kvDict[] = {{0, 2}, {1, 2}, {15, 16}, {31, 32}, {56, 64}, {100, 128},
{200, 256}, {500, 512}, {1000, 1024}, {2000, 2048}, {3000, 4096}};
TEST_F(BackendEnv, backendUtil) {
for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) {
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
}
}
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -25,46 +25,49 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
// tsSnodeAddress = "";
// tsS3StreamEnabled = 0;
#include "cos.h"
#include "rsync.h"
#include "streamInt.h"
#include "cos.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
// int main(int argc, char **argv) {
// testing::InitGoogleTest(&argc, argv);
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
printf("error");
}
if (s3Init() < 0) {
return -1;
}
strcpy(tsSnodeAddress, "127.0.0.1");
int ret = RUN_ALL_TESTS();
s3CleanUp();
return ret;
}
// if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
// printf("error");
// }
// if (s3Init() < 0) {
// return -1;
// }
// strcpy(tsSnodeAddress, "127.0.0.1");
// int ret = RUN_ALL_TESTS();
// s3CleanUp();
// return ret;
// }
TEST(testCase, checkpointUpload_Test) {
stopRsync();
startRsync();
// stopRsync();
// startRsync();
taosSsleep(5);
char* id = "2013892036";
uploadCheckpoint(id, "/root/offset/");
// uploadCheckpoint(id, "/root/offset/");
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadCheckpoint(id, "/root/offset/download/");
// downloadCheckpoint(id, "/root/offset/download/");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteCheckpoint(id);
// deleteCheckpoint(id);
}
TEST(testCase, checkpointDeleteFile_Test) {
char* id = "2013892036";
deleteCheckpointFile(id, "offset-ver0");
// deleteCheckpointFile(id, "offset-ver0");
}

View File

@ -14,10 +14,7 @@ class StreamStateEnv : public ::testing::Test {
streamMetaInit();
backend = streamBackendInit(path, 0, 0);
}
virtual void TearDown() {
streamMetaCleanup();
// indexClose(index);
}
virtual void TearDown() { streamMetaCleanup(); }
const char *path = TD_TMP_DIR_PATH "stream";
void *backend;
@ -50,6 +47,14 @@ bool equalSBF(SScalableBf *left, SScalableBf *right) {
}
TEST(TD_STREAM_UPDATE_TEST, update) {
const char *streamPath = "/tmp";
char *absPath = NULL;
void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1);
p = bkdMgtCreate((char *)streamPath);
// const int64_t interval = 20 * 1000;
// const int64_t watermark = 10 * 60 * 1000;
// SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);

View File

@ -1343,7 +1343,7 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
if (syncIsInit()) {
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, syncEnv()->pTimerManager,
&pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else {
@ -1415,8 +1415,8 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
if (syncIsInit()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pHeartbeatTimer);
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
@ -2153,7 +2153,11 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (!syncIsInit()) return;
SSyncNode* pNode = param;
int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
SRpcMsg rpcMsg = {0};
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
@ -2173,7 +2177,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
}
_out:
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pPingTimer);
}
}
@ -2224,7 +2229,11 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (!syncIsInit()) return;
SSyncNode* pNode = param;
int64_t rid = (int64_t)param;
SSyncNode* pNode = syncNodeAcquire(rid);
if (pNode == NULL) return;
if (pNode->totalReplicaNum > 1) {
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
SRpcMsg rpcMsg = {0};
@ -2245,7 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
}
_out:
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, pNode, syncEnv()->pTimerManager,
taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer);
} else {

View File

@ -316,7 +316,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
}
// 3. Try to Recycle a page
if (!pPage && !pCache->lru.pLruPrev->isAnchor) {
if (!pPageH && !pPage && !pCache->lru.pLruPrev->isAnchor) {
pPage = pCache->lru.pLruPrev;
tdbPCacheRemovePageFromHash(pCache, pPage);
tdbPCachePinPage(pCache, pPage);

View File

@ -89,12 +89,14 @@ static int32_t taosArrayResize(SArray* pArray) {
int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
if (newCap > pArray->capacity) {
float factor = BOUNDARY_BIG_FACTOR;
if(newCap * pArray->elemSize > BOUNDARY_SIZE){
if (newCap * pArray->elemSize > BOUNDARY_SIZE) {
factor = BOUNDARY_SMALL_FACTOR;
}
size_t tsize = (pArray->capacity * factor);
while (newCap > tsize) {
tsize = (tsize * factor);
size_t newSize = (tsize * factor);
tsize = (newSize == tsize) ? (tsize + 2) : newSize;
}
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);

View File

@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_NOT_EXIST, "view not exists in db
//mnode-compact
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COMPACT_ID, "Invalid compact id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST, "compact detail doesn't exist")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")

View File

@ -21,6 +21,40 @@
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
FItem itemFp;
FItems itemsFp;
TdThreadMutex mutex;
int64_t memOfItems;
int32_t numOfItems;
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
};
struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
};
struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
int64_t memOfItems;
int32_t unAccessedNumOfItems;
int64_t unAccessMemOfItems;
};
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
@ -497,6 +531,12 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) {
pQueue->threadId = threadId;
}
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
#if 0
void taosResetQsetThread(STaosQset *qset, void *pItem) {

View File

@ -20,6 +20,9 @@
#define DEFAULT_GROWTH 2
#define DEFAULT_TIGHTENING_RATIO 0.5
#define DEFAULT_MAX_BLOOMFILTERS 4
#define SBF_INVALID -1
#define SBF_VALID 0
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
if (pSBf == NULL) {
return NULL;
}
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
pSBf->status = SBF_VALID;
pSBf->numBits = 0;
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
}
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
}
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray);
@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
}
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray);
@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
}
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
return NULL;
}
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
if (pNormalBf == NULL) {
return NULL;
@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
}
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1;
if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1;
return 0;
}
@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error;
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error;
return pSBf;
_error:

View File

@ -417,9 +417,9 @@ _OVER:
return NULL;
} else {
while (worker->pid <= 0) taosMsleep(10);
queue->threadId = worker->pid;
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
queue->threadId);
taosQueueSetThreadId(queue, worker->pid);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
return queue;
}
}

View File

@ -46,7 +46,9 @@ class TDTestCase(TBase):
# clusterDnodes.starttaosd(1)
# time.sleep(5)
autoGen.insert_data(5000, True)
tdSql.execute(f"flush database {self.db}")
self.flushDb(True)
# wait flush operation over
time.sleep(5)
# sql = 'show vnodes;'
# while True:

View File

@ -1211,6 +1211,7 @@
,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeMultiLevelInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/event0.sim

View File

@ -214,7 +214,7 @@ function lcovFunc {
'*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c' '*/build_version.cc'\
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' \
'*/shellAuto.c' '*/shellTire.c' '*/shellCommand.c'\
'*/sql.c' '*/sql.y'\
'*/sql.c' '*/sql.y' '*/smaSnapshot.c' '*/smaCommit.c'\
--branch-coverage --function-coverage -o coverage.info
# generate result

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 1;
sql use test;
sql alter local 'disableCount' '0' ;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0);

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 4;
sql use test;
sql alter local 'disableCount' '0' ;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 1;
sql use test;
sql alter local 'disableCount' '0' ;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0);

View File

@ -0,0 +1,267 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c streamAggCnt -v 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step1
sql drop stream if exists streams1;
sql drop database if exists test;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s);
sleep 1000
sql insert into ts1 values(1648791213000,1,1,3,4.1);
sql insert into ts1 values(1648791223000,2,2,3,1.1);
sql insert into ts1 values(1648791233000,3,3,3,2.1);
sql insert into ts1 values(1648791243000,4,4,3,3.1);
sql insert into ts2 values(1648791213000,1,5,3,4.1);
sql insert into ts2 values(1648791223000,2,6,3,1.1);
sql insert into ts2 values(1648791233000,3,7,3,2.1);
sql insert into ts2 values(1648791243000,4,8,3,3.1);
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data11 != 2 then
print =====data11=$data11
goto loop0
endi
if $data21 != 2 then
print =====data21=$data21
goto loop0
endi
if $data31 != 2 then
print =====data31=$data31
goto loop0
endi
sql insert into ts1 values(1648791213000,1,9,3,4.1);
$loop_count = 0
loop1:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop1
endi
if $data11 != 2 then
print =====data11=$data11
goto loop1
endi
if $data21 != 2 then
print =====data21=$data21
goto loop1
endi
if $data31 != 2 then
print =====data31=$data31
goto loop1
endi
sql delete from ts2 where ts = 1648791243000 ;
$loop_count = 0
loop2:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop2
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
if $data21 != 2 then
print =====data21=$data21
goto loop2
endi
if $data31 != 1 then
print =====data31=$data31
goto loop2
endi
sql delete from ts2 where ts = 1648791223000 ;
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop3
endi
if $data01 != 2 then
print =====data01=$data01
goto loop3
endi
if $data11 != 1 then
print =====data11=$data11
goto loop3
endi
if $data21 != 2 then
print =====data21=$data21
goto loop3
endi
if $data31 != 1 then
print =====data31=$data31
goto loop3
endi
sql insert into ts1 values(1648791233001,3,9,3,2.1);
$loop_count = 0
loop4:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 1000
print 2 select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
if $rows != 4 then
print =====rows=$rows
goto loop4
endi
if $data01 != 2 then
print =====data01=$data01
goto loop4
endi
if $data11 != 1 then
print =====data11=$data11
goto loop4
endi
if $data21 != 3 then
print =====data21=$data21
goto loop4
endi
if $data31 != 1 then
print =====data31=$data31
goto loop4
endi
sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
print ===== over
system sh/stop_dnodes.sh

View File

@ -206,6 +206,7 @@ SWords shellCommands[] = {
{"show grants full;", 0, 0, NULL},
{"show grants logs;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"show views;", 0, 0, NULL},
{"split vgroup <vgroup_id>", 0, 0, NULL},
#endif
{"insert into <tb_name> values(", 0, 0, NULL},
@ -570,6 +571,7 @@ void showHelp() {
split vgroup <vgroup_id>;\n\
show compacts;\n\
show compact \n\
show views;\n\
show create view <all_table>;");
#endif

View File

@ -1019,7 +1019,7 @@ void shellReadHistory() {
char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1);
int32_t read_size = 0;
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) {
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) > 0) {
line[read_size - 1] = '\0';
taosMemoryFree(pHistory->hist[pHistory->hend]);
pHistory->hist[pHistory->hend] = taosStrdup(line);

View File

@ -1018,7 +1018,7 @@ int sml_escape_Test() {
ASSERT(numFields == 5);
ASSERT(strncmp(fields[1].name, "inode\"i,= s_used", sizeof("inode\"i,= s_used") - 1) == 0);
ASSERT(strncmp(fields[2].name, "total", sizeof("total") - 1) == 0);
ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\\\ree", sizeof("inode\"i,= s_f\\\\ree") - 1) == 0);
ASSERT(strncmp(fields[3].name, "inode\"i,= s_f\\ree", sizeof("inode\"i,= s_f\\ree") - 1) == 0);
ASSERT(strncmp(fields[4].name, "dev\"i,= ce", sizeof("dev\"i,= ce") - 1) == 0);
TAOS_ROW row = NULL;
@ -1044,6 +1044,88 @@ int sml_escape_Test() {
return code;
}
// test field with end of escape
int sml_escape1_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_escape");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_escape");
taos_free_result(pRes);
const char *sql[] = {
"stab,t1\\=1 c1=3,c2=\"32fw\" 1661943970000000000",
"stab,t1=1\\ c1=3,c2=\"32fw\" 1661943980000000000",
"stab,t1=1 c1\\=3,c2=\"32fw\" 1661943990000000000",
};
for(int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++){
pRes = taos_schemaless_insert(taos, (char**)&sql[i], 1, TSDB_SML_LINE_PROTOCOL, 0);
int code = taos_errno(pRes);
ASSERT(code);
}
const char *sql1[] = {
"stab\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000",
"stab\\\\,t1=1 c1=3,c2=\"32fw\" 1661943960000000000",
"stab,t1\\\\=1 c1=3,c2=\"32fw\" 1661943970000000000",
"stab,t1=1\\\\ c1=3,c2=\"32fw\" 1661943980000000000",
"stab,t1=1 c1\\\\=3,c2=\"32fw\" 1661943990000000000",
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, 0);
printf("%s result:%s, rows:%d\n", __FUNCTION__, taos_errstr(pRes), taos_affected_rows(pRes));
int code = taos_errno(pRes);
ASSERT(!code);
ASSERT(taos_affected_rows(pRes) == 5);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from stab"); //check stable name
ASSERT(pRes);
int fieldNum = taos_field_count(pRes);
ASSERT(fieldNum == 6);
printf("fieldNum:%d\n", fieldNum);
int numFields = taos_num_fields(pRes);
TAOS_FIELD *fields = taos_fetch_fields(pRes);
ASSERT(numFields == 6);
ASSERT(strncmp(fields[1].name, "c1", sizeof("c1") - 1) == 0);
ASSERT(strncmp(fields[2].name, "c2", sizeof("c2") - 1) == 0);
ASSERT(strncmp(fields[3].name, "c1\\", sizeof("c1\\") - 1) == 0);
ASSERT(strncmp(fields[4].name, "t1\\", sizeof("t1\\") - 1) == 0);
ASSERT(strncmp(fields[5].name, "t1", sizeof("t1") - 1) == 0);
TAOS_ROW row = NULL;
int32_t rowIndex = 0;
while ((row = taos_fetch_row(pRes)) != NULL) {
int64_t ts = *(int64_t *)row[0];
if (ts == 1661943970000) {
ASSERT(*(double *)row[1] == 3);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(row[3] == NULL);
ASSERT(strncmp(row[4], "1", sizeof("1") - 1) == 0);
ASSERT(row[5] == NULL);
}else if (ts == 1661943980000) {
ASSERT(*(double *)row[1] == 3);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(row[3] == NULL);
ASSERT(row[4] == NULL);
ASSERT(strncmp(row[5], "1\\", sizeof("1\\") - 1) == 0);
}else if (ts == 1661943990000) {
ASSERT(row[1] == NULL);
ASSERT(strncmp(row[2], "32fw", sizeof("32fw") - 1) == 0);
ASSERT(*(double *)row[3] == 3);
ASSERT(row[4] == NULL);
ASSERT(strncmp(row[5], "1", sizeof("1") - 1) == 0);
}
rowIndex++;
}
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_19221_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1775,17 +1857,14 @@ int main(int argc, char *argv[]) {
ASSERT(ret);
ret = sml_escape_Test();
ASSERT(!ret);
ret = sml_escape1_Test();
ASSERT(!ret);
ret = sml_ts3116_Test();
ASSERT(!ret);
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret);
ret = sml_ts3303_Test();
ASSERT(!ret);
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
// }
// int ret = 0;
ret = sml_ttl_Test();
ASSERT(!ret);
ret = sml_ts2164_Test();