Merge branch '3.0' into merge/3.0tomain

This commit is contained in:
Shengliang Guan 2024-12-27 22:27:16 +08:00
commit 2161399eec
42 changed files with 414 additions and 379 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -190,7 +190,8 @@ The effective value of charset is UTF-8.
|Parameter Name |Supported Version |Dynamic Modification|Description|
|-----------------------|-------------------------|--------------------|------------|
|supportVnodes | |Supported, effective immediately |Maximum number of vnodes supported by a dnode, range 0-4096, default value is twice the number of CPU cores + 5|
|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 0-1024, default value 4|
|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-1024, default value 4|
|numOfCompactThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-16, default value 2|
|numOfMnodeReadThreads | |Supported, effective after restart|Number of Read threads for mnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)|
|numOfVnodeQueryThreads | |Supported, effective after restart|Number of Query threads for vnode, range 0-1024, default value is twice the number of CPU cores (not exceeding 16)|
|numOfVnodeFetchThreads | |Supported, effective after restart|Number of Fetch threads for vnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)|

View File

@ -185,7 +185,8 @@ charset 的有效值是 UTF-8。
|参数名称|支持版本|动态修改|参数含义|
|--------------------------|----------|-------------------------|-|
|supportVnodes | |支持动态修改 立即生效 |dnode 支持的最大 vnode 数目,取值范围 0-4096默认值 CPU 核数的 2 倍 + 5|
|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 0-1024默认值为 4|
|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-1024默认值为 4|
|numOfCompactThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-16默认值为 2|
|numOfMnodeReadThreads | |支持动态修改 重启生效 |mnode 的 Read 线程数目,取值范围 0-1024默认值为 CPU 核数的四分之一(不超过 4|
|numOfVnodeQueryThreads | |支持动态修改 重启生效 |vnode 的 Query 线程数目,取值范围 0-1024默认值为 CPU 核数的两倍(不超过 16|
|numOfVnodeFetchThreads | |支持动态修改 重启生效 |vnode 的 Fetch 线程数目,取值范围 0-1024默认值为 CPU 核数的四分之一(不超过 4|

View File

@ -182,7 +182,7 @@ void tColDataClear(SColData *pColData);
void tColDataDeepClear(SColData *pColData);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward);
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key);

View File

@ -112,9 +112,8 @@ extern int32_t tsNumOfSnodeWriteThreads;
extern int64_t tsQueueMemoryAllowed;
extern int32_t tsRetentionSpeedLimitMB;
extern const char *tsAlterCompactTaskKeywords;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;
// sync raft
extern int32_t tsElectInterval;

View File

@ -848,6 +848,7 @@ static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
STMT_DLOG("start to set dbName: %s", dbName);
pStmt->db = taosStrdup(dbName);
(void)strdequote(pStmt->db);
STMT_ERR_RET(stmtCreateRequest(pStmt));
// The SQL statement specifies a database name, overriding the previously specified database

View File

@ -126,6 +126,10 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) {
void queryCallback(void* param, void* res, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
(void)printf("failed to execute, reason:%s\n", taos_errstr(res));
taos_free_result(res);
tsem_t *sem = (tsem_t *)param;
tsem_post(sem);
return;
}
(void)printf("start to fetch data\n");
taos_fetch_raw_block_a(res, fetchCallback, param);

View File

@ -2671,8 +2671,13 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal
tColDataGetValue6, // HAS_VALUE | HAS_NULL
tColDataGetValue7 // HAS_VALUE | HAS_NULL | HAS_NONE
};
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
if (iVal < 0 || iVal >= pColData->nVal ||
(pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl)/POINTER_BYTES)){
return TSDB_CODE_INVALID_PARA;
}
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
return TSDB_CODE_SUCCESS;
}
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) {
@ -3436,7 +3441,10 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S
for (int32_t i = 0; i < nColData; i++) {
SColVal cv = {0};
tColDataGetValue(&aFromColData[i], iFromRow, &cv);
code = tColDataGetValue(&aFromColData[i], iFromRow, &cv);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tColDataAppendValue(&aToColData[i], &cv);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -3575,7 +3583,10 @@ static int32_t tColDataMerge(SArray **colArr) {
SColData *dstCol = taosArrayGet(dst, j);
SColVal cv;
tColDataGetValue(srcCol, i, &cv);
code = tColDataGetValue(srcCol, i, &cv);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tColDataAppendValue(dstCol, &cv);
if (code) {
goto _exit;
@ -3588,7 +3599,10 @@ static int32_t tColDataMerge(SArray **colArr) {
SColData *dstCol = taosArrayGet(dst, j);
SColVal cv;
tColDataGetValue(srcCol, i, &cv);
code = tColDataGetValue(srcCol, i, &cv);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tColDataUpdateValue(dstCol, &cv, true);
if (code) {
goto _exit;

View File

@ -102,9 +102,8 @@ int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
const char *tsAlterCompactTaskKeywords = "max_compact_tasks";
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;
// sync raft
int32_t tsElectInterval = 25 * 1000;
@ -745,8 +744,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX,
CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -795,9 +795,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
tsNumOfCompactThreads = tsNumOfCommitThreads;
tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4);
tsNumOfSupportVnodes = tsNumOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
@ -842,7 +839,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxCompactConcurrency", tsNumOfCompactThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCompactThreads", tsNumOfCompactThreads, 1, 16, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0);
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0);
@ -1040,10 +1037,8 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "maxCompactConcurrency");
pItem = cfgGetItem(pCfg, "numOfCompactThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCompactThreads = numOfCores / 2;
tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4);
pItem->i32 = tsNumOfCompactThreads;
pItem->stype = stype;
}
@ -1548,7 +1543,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCommitThreads");
tsNumOfCommitThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "maxCompactConcurrency");
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCompactThreads");
tsNumOfCompactThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "retentionSpeedLimitMB");
@ -2354,6 +2349,8 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
TAOS_RETURN(code);
}
extern void tsdbAlterNumCompactThreads();
static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = -1;
@ -2404,6 +2401,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
goto _exit;
}
if (strcasecmp(name, "numOfCompactThreads") == 0) {
#ifdef TD_ENTERPRISE
tsNumOfCompactThreads = pItem->i32;
code = TSDB_CODE_SUCCESS;
// tsdbAlterNumCompactThreads();
#else
code = TSDB_CODE_INVALID_CFG;
#endif
goto _exit;
}
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function
static OptionNameAndVar debugOptions[] = {
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag},

View File

@ -449,6 +449,20 @@ static void checkTSRow(const char **data, STSRow *row, STSchema *pTSchema) {
checkSColVal(data[i], &colVal, pCol->type);
}
}
#ifndef WINDOWS
TEST(testCase, tColDataGetValue) {
SColData pColData = {0};
SColVal pColVal = {0};
ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0);
pColData = {.flag = 8};
pColVal = {0};
ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0);
pColData = {.nVal = 1, .flag = 8};
ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0);
}
#endif
TEST(testCase, AllNormTest) {
int16_t nCols = 14;

View File

@ -475,27 +475,6 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
extern void tsdbAlterNumCompactThreads();
static int32_t dmAlterMaxCompactTask(const char *value) {
int32_t max_compact_tasks;
char *endptr = NULL;
max_compact_tasks = taosStr2Int32(value, &endptr, 10);
if (endptr == value || endptr[0] != '\0') {
return TSDB_CODE_INVALID_MSG;
}
if (max_compact_tasks != tsNumOfCompactThreads) {
dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
tsNumOfCompactThreads = max_compact_tasks;
#ifdef TD_ENTERPRISE
(void)tsdbAlterNumCompactThreads();
#endif
}
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SDCfgDnodeReq cfgReq = {0};
@ -509,10 +488,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
}
if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) {
return dmAlterMaxCompactTask(cfgReq.value);
}
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);

View File

@ -129,7 +129,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -325,7 +325,6 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8
ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (ctx->suidInfo == NULL) {
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
;
}
taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

View File

@ -796,7 +796,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, i, &colVal);
code = tColDataGetValue(pCol, i, &colVal);
TSDB_CHECK_CODE(code, line, END);
code = doSetVal(pColData, i, &colVal);
TSDB_CHECK_CODE(code, line, END);
}
@ -937,7 +938,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
pCol = taosArrayGet(pCols, j);
TQ_NULL_GO_TO_END(pCol);
SColVal colVal = {0};
tColDataGetValue(pCol, i, &colVal);
TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
PROCESS_VAL
}
@ -961,7 +962,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
TQ_NULL_GO_TO_END(pColData);
SColVal colVal = {0};
tColDataGetValue(pCol, i, &colVal);
TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
SET_DATA
}

View File

@ -1715,7 +1715,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow);
if (colType == 2) {
SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type);
tColDataGetValue(pColData, tRow.iRow, &colVal);
TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit);
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {

View File

@ -123,7 +123,8 @@ static int32_t tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int
pColData = &pBlock->aColData[slotId];
tColDataGetValue(pColData, irow, &cv);
code = tColDataGetValue(pColData, irow, &cv);
TSDB_CHECK_CODE(code, lino, _end);
pKey->numOfPKs = 1;
pKey->pks[0].type = cv.value.type;
@ -1603,7 +1604,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
TSDB_CHECK_CODE(code, lino, _end);
} else { // varchar/nchar type
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
tColDataGetValue(pData, j, &cv);
code = tColDataGetValue(pData, j, &cv);
TSDB_CHECK_CODE(code, lino, _end);
code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
TSDB_CHECK_CODE(code, lino, _end);
}
@ -5282,7 +5284,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
if (pData->cid == pSupInfo->colId[i]) {
tColDataGetValue(pData, rowIndex, &cv);
code = tColDataGetValue(pData, rowIndex, &cv);
TSDB_CHECK_CODE(code, lino, _end);
code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
TSDB_CHECK_CODE(code, lino, _end);
j += 1;

View File

@ -622,7 +622,9 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId);
if (pColData) {
tColDataGetValue(pColData, pRow->iRow, pColVal);
if (tColDataGetValue(pColData, pRow->iRow, pColVal) != 0){
tsdbError("failed to tColDataGetValue");
}
} else {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
}
@ -645,7 +647,9 @@ void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
SColData *pColData = &pBlock->aColData[i];
if (pColData->cflag & COL_IS_KEY) {
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
if (tColDataGetValue(pColData, irow, &cv) != 0){
break;
}
key->pks[key->numOfPKs] = cv.value;
key->numOfPKs++;
} else {
@ -719,7 +723,9 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
}
if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
if (tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv) != 0){
return NULL;
}
++pIter->iColData;
return &pIter->cv;
} else {
@ -1251,7 +1257,8 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo
cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit;
} else {
tColDataGetValue(pColDataFrom, iRow, &cv);
code = tColDataGetValue(pColDataFrom, iRow, &cv);
if (code) goto _exit;
if (flag) {
code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);

View File

@ -243,7 +243,7 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_VND_HASH_MISMATCH;
goto _exit;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK);
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
tstrncpy(cfgRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN);
@ -279,7 +279,8 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
}
} else {
vError("vnodeGetTableCfg get invalid table type:%d", mer1.me.type);
return TSDB_CODE_APP_ERROR;
code = TSDB_CODE_APP_ERROR;
goto _exit;
}
cfgRsp.numOfTags = schemaTag.nCols;

View File

@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock*
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
const char* id = GET_TASKID(pTaskInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
}
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt);
qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt);
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
goto _error;
}
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName);
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
goto _error;
}
@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
pInfo->anomalyKey.type = pInfo->anomalyCol.type;
pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
if (pInfo->anomalyKey.pData == NULL) {
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno)
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
if (pInfo->anomalySup.pResultRow == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno)
pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
if (pInfo->anomalySup.blocks == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno)
pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
if (pInfo->anomalySup.windows == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno)
code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
*pOptrInfo = pOperator;
qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl,
qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl,
pInfo->anomalyOpt);
return TSDB_CODE_SUCCESS;
_error:
qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt,
tstrerror(code));
if (pInfo != NULL) {
anomalyDestroyOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code);
return code;
}

View File

@ -1441,7 +1441,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet
SMetaReader mr1 = {0};
pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn);
int64_t suid = pMReader->me.ctbEntry.suid;
code = pMetaReaderFn->getTableEntryByUid(&mr1, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -1752,7 +1752,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -2284,7 +2284,7 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
// db_name
pColInfoData = taosArrayGet(p->pDataBlock, index++);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, db, false);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgroup_id

View File

@ -28,7 +28,7 @@
extern "C" {
#endif
typedef enum { MATCH, JUMP, SPLIT, RANGE } InstType;
typedef enum { INS_MATCH, INS_JUMP, INS_SPLIT, INS_RANGE } InstType;
typedef struct MatchValue {
#ifdef WINDOWS

View File

@ -159,14 +159,14 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
if (false == sparSetGet(set, i, &ip)) continue;
Inst *inst = taosArrayGet(builder->dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
goto _exception;
}
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
@ -234,11 +234,11 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) {
}
bool succ = sparSetAdd(set, ip, NULL);
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == MATCH || inst->ty == RANGE) {
if (inst->ty == INS_MATCH || inst->ty == INS_RANGE) {
// do nothing
} else if (inst->ty == JUMP) {
} else if (inst->ty == INS_JUMP) {
dfaAdd(dfa, set, inst->jv.step);
} else if (inst->ty == SPLIT) {
} else if (inst->ty == INS_SPLIT) {
dfaAdd(dfa, set, inst->sv.len1);
dfaAdd(dfa, set, inst->sv.len2);
}
@ -253,11 +253,11 @@ bool dfaRun(FstDfa *dfa, FstSparseSet *from, FstSparseSet *to, uint8_t byte) {
if (false == sparSetGet(from, i, &ip)) continue;
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (inst->rv.start <= byte && byte <= inst->rv.end) {
dfaAdd(dfa, to, ip + 1);
}

View File

@ -17,6 +17,7 @@
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
#include "indexFstDfa.h"
class UtilEnv : public ::testing::Test {
protected:
@ -41,6 +42,29 @@ class UtilEnv : public ::testing::Test {
SArray *rslt;
};
class UtilComm : public ::testing::Test {
protected:
virtual void SetUp() {
// src = (SArray *)taosArrayInit(2, sizeof(void *));
// for (int i = 0; i < 3; i++) {
// SArray *m = taosArrayInit(10, sizeof(uint64_t));
// taosArrayPush(src, &m);
// }
// rslt = (SArray *)taosArrayInit(10, sizeof(uint64_t));
}
virtual void TearDown() {
// for (int i = 0; i < taosArrayGetSize(src); i++) {
// SArray *m = (SArray *)taosArrayGetP(src, i);
// taosArrayDestroy(m);
// }
// taosArrayDestroy(src);
}
// SArray *src;
// SArray *rslt;
};
static void clearSourceArray(SArray *p) {
for (int i = 0; i < taosArrayGetSize(p); i++) {
SArray *m = (SArray *)taosArrayGetP(p, i);
@ -369,3 +393,35 @@ TEST_F(UtilEnv, testDictComm) {
EXPECT_EQ(COMMON_INPUTS[v], i);
}
}
TEST_F(UtilComm, testCompress) {
for (int32_t i = 0; i < 6; i++) {
_cache_range_compare cmpFunc = idxGetCompare((RangeType)(i));
//char[32]a = 0, b = 1;
char a[32] = {0};
char b[32] = {1};
for (int32_t j = 0; j < TSDB_DATA_TYPE_MAX; j++) {
cmpFunc(a, b, j);
}
}
}
TEST_F(UtilComm, testfstDfa) {
{
FstDfaBuilder *builder = dfaBuilderCreate(NULL);
ASSERT_TRUE(builder != NULL);
dfaBuilderDestroy(builder);
}
{
SArray *pInst = taosArrayInit(32, sizeof(uint8_t));
for (int32_t i = 0; i < 26; i++) {
uint8_t v = 'a' + i;
taosArrayPush(pInst, &v);
}
FstDfaBuilder *builder = dfaBuilderCreate(pInst);
FstDfa *dfa = dfaBuilderBuild(builder);
dfaBuilderDestroy(builder);
}
}

View File

@ -1841,6 +1841,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
}
if (TK_NK_QUESTION == pToken->type) {
if (!pCxt->pComCxt->isStmtBind && i != 0) {
return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values");
}
pCxt->isStmtBind = true;
pStmt->usingTableProcessing = true;
if (pCols->pColIndex[i] == tbnameIdx) {
@ -1874,6 +1877,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound");
}
} else {
if (pCxt->pComCxt->isStmtBind) {
return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values");
}
if (pCols->pColIndex[i] < numOfCols) {
const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);

View File

@ -10086,7 +10086,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
const char* validConfigs[] = {
"encrypt_key",
tsAlterCompactTaskKeywords,
};
if (0 == strncasecmp(cfgReq.config, validConfigs[0], strlen(validConfigs[0]) + 1)) {
int32_t klen = strlen(cfgReq.value);
@ -10097,28 +10096,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
}
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
} else if (0 == strncasecmp(cfgReq.config, validConfigs[1], strlen(validConfigs[1]) + 1)) {
char* endptr = NULL;
int32_t maxCompactTasks = taosStr2Int32(cfgReq.value, &endptr, 10);
int32_t minMaxCompactTasks = MIN_MAX_COMPACT_TASKS;
int32_t maxMaxCompactTasks = MAX_MAX_COMPACT_TASKS;
// check format
if (endptr == cfgReq.value || endptr[0] != '\0') {
tFreeSMCfgDnodeReq(&cfgReq);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
"Invalid max compact tasks: %s", cfgReq.value);
}
// check range
if (maxCompactTasks < minMaxCompactTasks || maxCompactTasks > maxMaxCompactTasks) {
tFreeSMCfgDnodeReq(&cfgReq);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
"Invalid max compact tasks: %d, valid range [%d,%d]", maxCompactTasks,
minMaxCompactTasks, maxMaxCompactTasks);
}
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
} else {
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
}

View File

@ -5128,8 +5128,8 @@ static int32_t fltSclCollectOperatorFromNode(SNode *pNode, SArray *sclOpList) {
SOperatorNode *pOper = (SOperatorNode *)pNode;
SValueNode *valNode = (SValueNode *)pOper->pRight;
if (IS_NUMERIC_TYPE(valNode->node.resType.type) || valNode->node.resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
SExprNode* pLeft = (SExprNode*)pOper->pLeft;
if (IS_NUMERIC_TYPE(pLeft->resType.type) || pLeft->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
SNode* pLeft = NULL, *pRight = NULL;
int32_t code = nodesCloneNode(pOper->pLeft, &pLeft);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
int32_t vgId = pTask->pMeta->vgId;
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit",
id, vgId, pTmrInfo->launchChkptId);
@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId);
return -1;
}
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
return -1;
@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
taosArrayDestroy(pTmp);
return terrno;
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level,
@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
}
}
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) {
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t vgId = pTask->pMeta->vgId;
int32_t checkpointId = pActiveInfo->activeId;
int64_t checkpointId = pActiveInfo->activeId;
const char* id = pTask->id.idStr;
int32_t notRsp = 0;
@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray*
return code;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code));
return code;
}
notRsp = taosArrayGetSize(pNotRspList);
notRsp = taosArrayGetSize(*pNotRspList);
if (notRsp == 0) {
streamClearChkptReadyMsg(pActiveInfo);
} else {
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList);
}
return code;
@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
streamMutexLock(&pActiveInfo->lock);
code = chkptReadyMsgSendHelper(pTask, param, pNotRspList);
code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList);
streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pList);
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num,
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
}
} else {
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id);
}
}

View File

@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) {
code = doStreamExecTask(pTask);
if (code) {
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
return code;
stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
}
// check if continue
streamMutexLock(&pTask->lock);

View File

@ -422,13 +422,11 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
if (pSyncNode->peersNum > 0) {
minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
}
for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (matchIndex < minMatchIndex) {
if (minMatchIndex == SYNC_INDEX_INVALID) {
minMatchIndex = matchIndex;
} else if (matchIndex > 0 && matchIndex < minMatchIndex) {
minMatchIndex = matchIndex;
}
}

View File

@ -2023,16 +2023,29 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
memcpy(pKey, cd.pKey, (size_t)cd.kLen);
if (ppVal) {
// TODO: vLen may be zero
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return terrno;
if (cd.vLen > 0) {
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return terrno;
}
memcpy(pVal, cd.pVal, (size_t)cd.vLen);
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbTrace("tdb/btree-next decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
} else {
pVal = NULL;
}
*ppVal = pVal;
*vLen = cd.vLen;
memcpy(pVal, cd.pVal, (size_t)cd.vLen);
} else {
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
}
ret = tdbBtcMoveToPrev(pBtc);

View File

@ -151,7 +151,6 @@ typedef struct SCliThrd {
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout
STrans* pInst; //
@ -159,8 +158,6 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache;
SCvtAddr* pCvtAddr;
SHashObj* failFastCache;
SHashObj* batchCache;
SHashObj* connHeapCache;
SCliReq* stopMsg;
@ -224,8 +221,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleException(SCliConn* conn);
@ -1299,8 +1294,8 @@ static void cliHandleException(SCliConn* conn) {
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
@ -2124,144 +2119,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
tTrace("cli process batch size:%d", count);
}
}
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
return NULL;
}
queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr);
pList->sending += 1;
pList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq);
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port);
static void destroyBatchList(SCliBatchList* pList);
static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
int32_t code = 0;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
return;
}
pBatchList->batchLenLimit = pInst->shareConnLimit;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
return;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->shareConnLimit += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
return;
}
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
if (pBatchList == NULL) {
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = 1;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 0;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip);
pBatchList->dst = taosStrdup(key);
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
*ppBatchList = pBatchList;
return 0;
}
static void destroyBatchList(SCliBatchList* pList) {
if (pList == NULL) {
return;
}
while (!QUEUE_IS_EMPTY(&pList->wq)) {
queue* h = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pList->ip);
taosMemoryFree(pList->dst);
taosMemoryFree(pList);
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, &pReq->q);
pBatch->wLen += 1;
pBatch->shareConnLimit = pReq->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
pList->len += 1;
*ppBatch = pBatch;
return 0;
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); }
static void cliAsyncCb(uv_async_t* handle) {
@ -2494,10 +2352,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
pThrd->destroyAhandleFp = pInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -2505,11 +2359,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->connHeapCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
@ -2553,10 +2402,7 @@ _end:
transDQDestroy(pThrd->delayQueue, NULL);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->pIdConnTable);
taosArrayDestroy(pThrd->pQIdBuf);
@ -2580,7 +2426,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
@ -2592,24 +2437,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
queue* h = QUEUE_HEAD(&pBatchList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
while (pIter2 != NULL) {
SHeap* heap = (SHeap*)(pIter2);

View File

@ -615,6 +615,21 @@ TEST_F(TransEnv, http) {
#endif
}
#if 1
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
for (int32_t i = 0; i < 1; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
#endif
{
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "error");

View File

@ -208,28 +208,22 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex));
SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (gloablPRet == NULL) {
SWalFileInfo *globalRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
if (globalRet == NULL) {
wError("failed to find WAL log file with ver:%" PRId64, ver);
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
}
SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo));
if (pRet == NULL) {
wError("failed to allocate memory for localRet");
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
}
TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo));
SWalFileInfo ret;
TAOS_MEMCPY(&ret, globalRet, sizeof(SWalFileInfo));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
if (pReader->curFileFirstVer != pRet->firstVer) {
if (pReader->curFileFirstVer != ret.firstVer) {
// error code was set inner
TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet);
TAOS_CHECK_RETURN(walReadChangeFile(pReader, ret.firstVer));
}
// error code was set inner
TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet);
taosMemoryFree(pRet);
TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, ret.firstVer, ver));
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver);
pReader->curVersion = ver;
@ -437,15 +431,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
seeked = true;
continue;
} else {
wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
if (contLen < 0) {
TAOS_RETURN(terrno);
code = terrno;
} else {
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
code = TSDB_CODE_WAL_FILE_CORRUPTED;
}
wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, tstrerror(code));
TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
TAOS_RETURN(code);
}
}
@ -478,15 +472,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
}
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) {
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, terrstr());
TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
if (contLen < 0) {
TAOS_RETURN(terrno);
code = terrno;
} else {
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
code = TSDB_CODE_WAL_FILE_CORRUPTED;
}
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, tstrerror(code));
TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex));
TAOS_RETURN(code);
}
if (pReader->pHead->head.version != ver) {

View File

@ -20,7 +20,7 @@
#ifdef USE_ANALYTICS
#include <curl/curl.h>
#define ANAL_ALGO_SPLIT ","
#define ANALYTICS_ALOG_SPLIT_CHAR ","
typedef struct {
int64_t ver;
@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue,
return false;
}
pEnd = strstr(pStart, ANAL_ALGO_SPLIT);
pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR);
if (optMaxLen > 0) {
if (pEnd > pStart) {
int32_t len = (int32_t)(pEnd - pStart);
@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR);
if (pos1 != NULL) {
*optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
return true;

View File

@ -673,11 +673,11 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
}
_exit:
taosUnLockLogFile(tsLogObj.logHandle->pFile);
if (code != 0) {
taosUnLockLogFile(tsLogObj.logHandle->pFile);
TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code)));
}
return 0;
return code;
}
static void taosUpdateLogNums(ELogLevel level) {

View File

@ -100,7 +100,7 @@ class TDSql:
if drop:
s = f'drop database if exists {dbname}'
self.cursor.execute(s)
s = f'create database {dbname}'
s = f'create database {dbname} stt_trigger 1'
for k, v in kwargs.items():
s += f" {k} {v}"
if "duration" not in kwargs:

View File

@ -5,8 +5,8 @@
#include <unistd.h>
#include "taos.h"
int CTB_NUMS = 2;
int ROW_NUMS = 2;
int CTB_NUMS = 1;
int ROW_NUMS = 1;
int CYC_NUMS = 2;
void do_query(TAOS* taos, const char* sql) {
@ -217,7 +217,7 @@ int main() {
exit(1);
}
do_stmt(taos, "insert into db.stb(tbname,ts,b,t1,t2) values(?,?,?,?,?)");
do_stmt(taos, "insert into `db`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)");
// do_stmt(taos, "insert into db.? using db.stb tags(?,?)values(?,?)");
// do_taosc(taos);
taos_close(taos);

View File

@ -13,7 +13,7 @@ if [ -n "$PID" ]; then
systemctl stop taosd
fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 taosd
@ -38,10 +38,10 @@ while [ -n "$PID" ]; do
else
lsof -nti:6030 | xargs kill -9
fi
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'`
done
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 tmq_sim
@ -52,5 +52,5 @@ while [ -n "$PID" ]; do
else
lsof -nti:6030 | xargs kill -9
fi
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'`
done

View File

@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create anode
print =============== failed to create anode on '127.0.0.1:1101'
sql_error create anode '127.0.0.1:1101'
sql show anodes
if $rows != 0 then
return -1
endi
sql_error drop anode 1
print ================ create anode
sql create anode '192.168.1.116:6050'
sql show anodes
@ -30,7 +40,7 @@ print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
@ -42,10 +52,11 @@ sql create table ct1 using stb tags(1000)
print ==================== insert data
# input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14)
sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22)
sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9)
sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40)
sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a')
sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a')
sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a')
sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a')
sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a')
sql select count(*) from ct1
if $data00 != 17 then
@ -58,6 +69,87 @@ if $data00 != 1 then
return -1
endi
print ================= try every loaded anomaly detection algorithm
sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');
sql select count(*) from ct1 anomaly_window(c1, 'algo=lof');
sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd');
sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs');
print ================= try every column type of column
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2');
print =================== invalid column type
sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2');
sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1
print =================== valid column type
sql select forecast(c1, 'conf=50 ,algo = arima') from ct1
sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1
if $rows != 10 then
return -1
endi
if $data03 != 28 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
print expect 23-11-15 06:13:20.000 , actual $data00
return -1
endi
if $data10 != @23-11-15 06:13:20.002@ then
print expect 23-11-15 06:13:20.002 , actual $data10
return -1
endi
if $data20 != @23-11-15 06:13:20.004@ then
return -1
endi
print test the every option and rows option
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1
if $rows != 5 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
return -1
endi
if $data10 != @23-11-15 06:13:20.100@ then
return -1
endi
sql drop anode 1
sql show anodes
@ -66,6 +158,13 @@ if $rows != 0 then
return -1
endi
sleep 1000
print ===================== query without anodes
sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check

View File

@ -39,6 +39,27 @@ class TDTestCase:
self.create_tables();
self.ts = 1500000000000
def test_TD_33336(self):
sql = "flush database db"
tdSql.execute(sql)
time.sleep(5)
sql = f'select last(ts) + 1d, last(ts) - 1d from db.t1'
tdSql.query(sql, queryTimes=1)
lastTs_add1d = tdSql.queryResult[0][0]
lastTs_sub1d = tdSql.queryResult[0][1]
sql = f'select count(*) from db.t1 where ts < "{lastTs_add1d}" and vc1 = 1'
tdSql.query(sql, queryTimes=1)
all_row_count = tdSql.queryResult[0][0]
tdLog.debug(f"all rows: {all_row_count}")
sql = f'select count(*) from db.t1 where ts < "{lastTs_sub1d}" and vc1 = 1'
tdSql.query(sql, queryTimes=1)
row_count_sub1d = tdSql.queryResult[0][0]
tdLog.debug(f"row_count_sub1d: {row_count_sub1d}")
if row_count_sub1d > all_row_count:
tdLog.exit(f' err rows returned for sql: {sql} row_count_sub1d: {row_count_sub1d} > all_row_count: {all_row_count}')
# run case
def run(self):
@ -53,6 +74,8 @@ class TDTestCase:
# self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
self.test_TD_33336()
# stop
def stop(self):
tdSql.close()
@ -65,11 +88,11 @@ class TDTestCase:
# create table
def create_tables(self, dbname="db"):
# super table
tdSql.execute(f"create table {dbname}.st(ts timestamp, i1 int,i2 int) tags(area int)")
tdSql.execute(f"create table {dbname}.st(ts timestamp, i1 int,i2 int, vc1 varchar(255)) tags(area int)")
# child table
tdSql.execute(f"create table {dbname}.t1 using {dbname}.st tags(1)")
tdSql.execute(f"create table {dbname}.st1(ts timestamp, i1 int ,i2 int) tags(area int) sma(i2) ")
tdSql.execute(f"create table {dbname}.st1(ts timestamp, i1 int ,i2 int, vc1 varchar(255)) tags(area int) sma(i2) ")
tdSql.execute(f"create table {dbname}.t4 using {dbname}.st1 tags(1)")
return
@ -98,7 +121,11 @@ class TDTestCase:
sql = pre_insert
tdLog.debug("insert table %s rows=%d ..." % (tbname, count))
for i in range(count):
sql += " (%d,%d,%d)" % (ts_start + i*1000, i, i+1)
if random.randint(0, 4) > 2:
tail = ''
else:
tail = 'asd'
sql += " (%d,%d,%d,%s)" % (ts_start + i*1000, i, i+1, '"' + str(random.randint(0,5)) + f'{tail}"')
if i > 0 and i % 20000 == 0:
tdLog.info("%d rows inserted" % i)
tdSql.execute(sql)
@ -125,4 +152,4 @@ class TDTestCase:
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())