Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30967

This commit is contained in:
54liuyao 2024-07-22 09:27:41 +08:00
commit b038b655da
30 changed files with 830 additions and 784 deletions

View File

@ -416,7 +416,7 @@ pipeline {
echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}" echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}"
} }
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
timeout(time: 150, unit: 'MINUTES'){ timeout(time: 200, unit: 'MINUTES'){
pre_test() pre_test()
script { script {
sh ''' sh '''

View File

@ -335,7 +335,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
void destroyQueryExecRes(SExecResult* pRes); void destroyQueryExecRes(SExecResult* pRes);
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len); int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
char* parseTagDatatoJson(void* p); void parseTagDatatoJson(void* p, char** jsonStr);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType); void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType);
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);

View File

@ -53,7 +53,7 @@ int32_t taosGetErrSize();
#define terrln (*taosGetErrln()) #define terrln (*taosGetErrln())
#define SET_ERROR_MSG(MSG, ...) \ #define SET_ERROR_MSG(MSG, ...) \
snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__) (void)snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__)
#define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
@ -556,6 +556,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x061B) #define TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C) #define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE TAOS_DEF_ERROR_CODE(0, 0x061D) #define TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE TAOS_DEF_ERROR_CODE(0, 0x061D)
#define TSDB_CODE_TDB_INCONSISTENT_DB_ID TAOS_DEF_ERROR_CODE(0, 0x061E)
// query // query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)

View File

@ -2007,7 +2007,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (tTagIsJson(data)) { } else if (tTagIsJson(data)) {
char* jsonString = parseTagDatatoJson(data); char* jsonString = NULL;
parseTagDatatoJson(data, &jsonString);
STR_TO_VARSTR(dst, jsonString); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString); taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"

File diff suppressed because it is too large Load Diff

View File

@ -23,11 +23,11 @@ TARGET_LINK_LIBRARIES(
PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry
) )
ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp) #ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp)
TARGET_LINK_LIBRARIES( #TARGET_LINK_LIBRARIES(
clientMonitorTest # clientMonitorTest
PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor # PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor
) #)
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
clientTest clientTest
@ -47,11 +47,11 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/client/inc" PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
) )
TARGET_INCLUDE_DIRECTORIES( #TARGET_INCLUDE_DIRECTORIES(
clientMonitorTest # clientMonitorTest
PUBLIC "${TD_SOURCE_DIR}/include/client/" # PUBLIC "${TD_SOURCE_DIR}/include/client/"
PRIVATE "${TD_SOURCE_DIR}/source/client/inc" # PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
) #)
add_test( add_test(
NAME smlTest NAME smlTest

View File

@ -10828,7 +10828,9 @@ int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
if (tDecodeI32(pDecoder, &size) < 0) return -1; if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size > 0) { if (size > 0) {
pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES); pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES);
if (!pRsp->batchMetaReq) return -1;
pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t)); pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t));
if (!pRsp->batchMetaLen) return -1;
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
void *pCreate = NULL; void *pCreate = NULL;
uint64_t len = 0; uint64_t len = 0;

View File

@ -103,31 +103,31 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
return 0; return 0;
END:
tdbAbort(pTq->pMetaDB, txn);
return code;
} }
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
return 0; return 0;
}
static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) {
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path));
END: END:
tdbAbort(pTq->pMetaDB, txn);
return code; return code;
} }
@ -408,7 +408,8 @@ END:
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL; char* maindb = NULL;
char* offsetNew = NULL;
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
if(!taosCheckExistFile(maindb)){ if(!taosCheckExistFile(maindb)){
@ -416,12 +417,20 @@ int32_t tqMetaOpen(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
}else{ }else{
TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
taosRemoveFile(maindb); (void)taosRemoveFile(maindb);
} }
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
(void)taosRemoveFile(offsetNew);
}
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
END: END:
taosMemoryFree(maindb); taosMemoryFree(maindb);
taosMemoryFree(offsetNew);
return code; return code;
} }
@ -445,14 +454,14 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ if(taosCheckExistFile(offset)) {
tqError("copy offset file error"); if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error");
} else {
(void)taosRemoveFile(offset);
}
} }
TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew));
(void)taosRemoveFile(offset);
(void)taosRemoveFile(offsetNew);
END: END:
taosMemoryFree(offset); taosMemoryFree(offset);
taosMemoryFree(offsetNew); taosMemoryFree(offsetNew);

View File

@ -73,6 +73,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
} }
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size));
tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey);
taosMemoryFree(pMemBuf); taosMemoryFree(pMemBuf);
pMemBuf = NULL; pMemBuf = NULL;
} }

View File

@ -365,6 +365,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
code = streamTrySchedExec(pTask); code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
taosArrayDestroy(pTaskList);
return -1; return -1;
} }
} }

View File

@ -190,10 +190,10 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){ TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
}); }));
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -206,6 +206,9 @@ static void *vnodeAsyncLoop(void *arg) {
SVWorker *worker = (SVWorker *)arg; SVWorker *worker = (SVWorker *)arg;
SVAsync *async = worker->async; SVAsync *async = worker->async;
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo)); SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
if (cancelArray == NULL) {
return NULL;
}
setThreadName(async->label); setThreadName(async->label);
@ -466,7 +469,7 @@ int32_t vnodeAsyncOpen(int32_t numOfThreads) {
vnodeAsyncSetWorkers(2, numOfThreads); vnodeAsyncSetWorkers(2, numOfThreads);
_exit: _exit:
return 0; return code;
} }
int32_t vnodeAsyncClose() { int32_t vnodeAsyncClose() {
@ -748,10 +751,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){ TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
}); }));
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -763,10 +766,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
channel->scheduled->prev->next = channel->scheduled->next; channel->scheduled->prev->next = channel->scheduled->next;
channel->scheduled->next->prev = channel->scheduled->prev; channel->scheduled->next->prev = channel->scheduled->prev;
if (channel->scheduled->cancel) { if (channel->scheduled->cancel) {
taosArrayPush(cancelArray, &(SVATaskCancelInfo){ TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = channel->scheduled->cancel, .cancel = channel->scheduled->cancel,
.arg = channel->scheduled->arg, .arg = channel->scheduled->arg,
}); }));
} }
vnodeAsyncTaskDone(async, channel->scheduled); vnodeAsyncTaskDone(async, channel->scheduled);
} }

View File

@ -16,13 +16,12 @@
#include "vnd.h" #include "vnd.h"
/* ------------------------ STRUCTURES ------------------------ */ /* ------------------------ STRUCTURES ------------------------ */
static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool; SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
if (pPool == NULL) { if (pPool == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
memset(pPool, 0, sizeof(SVBufPool)); memset(pPool, 0, sizeof(SVBufPool));
@ -44,14 +43,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo
pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
if (!pPool->lock) { if (!pPool->lock) {
taosMemoryFree(pPool); taosMemoryFree(pPool);
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
if (taosThreadSpinInit(pPool->lock, 0) != 0) { if (taosThreadSpinInit(pPool->lock, 0) != 0) {
taosMemoryFree((void *)pPool->lock); taosMemoryFree((void *)pPool->lock);
taosMemoryFree(pPool); taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno); return terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
} else { } else {
pPool->lock = NULL; pPool->lock = NULL;
@ -77,10 +74,11 @@ int vnodeOpenBufPool(SVnode *pVnode) {
for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
// create pool // create pool
if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) { int32_t code;
if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
return -1; return code;
} }
// add to free list // add to free list
@ -274,8 +272,6 @@ _exit:
} }
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
int32_t code = 0;
taosThreadMutexLock(&pPool->mutex); taosThreadMutexLock(&pPool->mutex);
pQNode->pNext = pPool->qList.pNext; pQNode->pNext = pPool->qList.pNext;
@ -285,9 +281,7 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
pPool->nQuery++; pPool->nQuery++;
taosThreadMutexUnlock(&pPool->mutex); taosThreadMutexUnlock(&pPool->mutex);
return 0;
_exit:
return code;
} }
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) { void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {

View File

@ -394,8 +394,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
} }
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) { if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH; return terrno = TSDB_CODE_VND_HASH_MISMATCH;
return -1;
} }
return 0; return 0;

View File

@ -129,17 +129,17 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable; pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it // pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
pTq->tqGetResultBlock = tqGetResultBlock; pTq->tqGetResultBlock = tqGetResultBlock;
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; // pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
pTq->tqGetResultBlockTime = tqGetResultBlockTime; pTq->tqGetResultBlockTime = tqGetResultBlockTime;
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo; pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
} }
void initStateStoreAPI(SStateStore* pStore) { void initStateStoreAPI(SStateStore* pStore) {
pStore->streamFileStateInit = streamFileStateInit; pStore->streamFileStateInit = streamFileStateInit;

View File

@ -19,31 +19,19 @@
static volatile int32_t VINIT = 0; static volatile int32_t VINIT = 0;
int vnodeInit(int nthreads) { int vnodeInit(int nthreads) {
int32_t init; if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
init = atomic_val_compare_exchange_32(&VINIT, 0, 1);
if (init) {
return 0; return 0;
} }
if (vnodeAsyncOpen(nthreads) != 0) { TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
return -1; TAOS_CHECK_RETURN(walInit());
}
if (walInit() < 0) {
return -1;
}
return 0; return 0;
} }
void vnodeCleanup() { void vnodeCleanup() {
int32_t init = atomic_val_compare_exchange_32(&VINIT, 1, 0); if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
if (init == 0) return;
// set stop
vnodeAsyncClose(); vnodeAsyncClose();
walCleanUp(); walCleanUp();
smaCleanUp(); smaCleanUp();
} }

View File

@ -446,7 +446,8 @@ int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
char* pJson = parseTagDatatoJson(pTag); char* pJson = NULL;
parseTagDatatoJson(pTag, &pJson);
STagVal tagVal; STagVal tagVal;
tagVal.cid = 0; tagVal.cid = 0;
tagVal.type = TSDB_DATA_TYPE_JSON; tagVal.type = TSDB_DATA_TYPE_JSON;

View File

@ -2079,7 +2079,8 @@ int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
char* pJson = parseTagDatatoJson(pTag); char* pJson = NULL;
parseTagDatatoJson(pTag, &pJson);
STagVal tagVal; STagVal tagVal;
tagVal.cid = 0; tagVal.cid = 0;
tagVal.type = TSDB_DATA_TYPE_JSON; tagVal.type = TSDB_DATA_TYPE_JSON;

View File

@ -503,11 +503,10 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
} }
if (tTagIsJson(pTag)) { if (tTagIsJson(pTag)) {
char* pJson = parseTagDatatoJson(pTag); char* pJson = NULL;
if (pJson) { parseTagDatatoJson(pTag, &pJson);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
taosMemoryFree(pJson); taosMemoryFree(pJson);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -966,7 +966,8 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
char* tagVarChar = NULL; char* tagVarChar = NULL;
if (tagData != NULL) { if (tagData != NULL) {
if (tagType == TSDB_DATA_TYPE_JSON) { if (tagType == TSDB_DATA_TYPE_JSON) {
char* tagJson = parseTagDatatoJson(tagData); char* tagJson = NULL;
parseTagDatatoJson(tagData, &tagJson);
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE); tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson)); memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
varDataSetLen(tagVarChar, strlen(tagJson)); varDataSetLen(tagVarChar, strlen(tagJson));

View File

@ -417,7 +417,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
char* parseTagDatatoJson(void* p) { void parseTagDatatoJson(void* p, char** jsonStr) {
char* string = NULL; char* string = NULL;
SArray* pTagVals = NULL; SArray* pTagVals = NULL;
cJSON* json = NULL; cJSON* json = NULL;
@ -436,6 +436,9 @@ char* parseTagDatatoJson(void* p) {
} }
for (int j = 0; j < nCols; ++j) { for (int j = 0; j < nCols; ++j) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
if (pTagVal == NULL) {
continue;
}
// json key encode by binary // json key encode by binary
tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey)); tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
// json value // json value
@ -445,11 +448,16 @@ char* parseTagDatatoJson(void* p) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
goto end;
}
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL; cJSON* value = NULL;
if (pTagVal->nData > 0) { if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
if (tagJsonValue == NULL) {
goto end;
}
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue); int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) { if (length < 0) {
qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
@ -464,25 +472,34 @@ char* parseTagDatatoJson(void* p) {
} }
} else if (pTagVal->nData == 0) { } else if (pTagVal->nData == 0) {
value = cJSON_CreateString(""); value = cJSON_CreateString("");
if (value == NULL) {
goto end;
}
} else { } else {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
goto end;
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(&pTagVal->i64); double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd); cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
goto end;
}
} else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(&pTagVal->i64); char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd); cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); if(!cJSON_AddItemToObject(json, tagJsonKey, value)){
goto end;
}
} else { } else {
goto end; goto end;
} }
@ -494,7 +511,7 @@ end:
if (string == NULL) { if (string == NULL) {
string = taosStrdup(TSDB_DATA_NULL_STR_L); string = taosStrdup(TSDB_DATA_NULL_STR_L);
} }
return string; *jsonStr = string;
} }
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {

View File

@ -214,7 +214,6 @@ void schedulerDestroy(void) {
} }
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
taosTmrCleanUp(schMgmt.timer);
qWorkerDestroy(&schMgmt.queryMgmt); qWorkerDestroy(&schMgmt.queryMgmt);
schMgmt.queryMgmt = NULL; schMgmt.queryMgmt = NULL;
} }

View File

@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
streamMutexLock(&pInfo->checkInfoLock); streamMutexLock(&pInfo->checkInfoLock);
// drop procedure already started, not start check downstream now
ETaskStatus s = streamTaskGetStatus(pTask).state;
if (s == TASK_STATUS__DROPPING) {
streamMutexUnlock(&pInfo->checkInfoLock);
return;
}
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMutexUnlock(&pInfo->checkInfoLock); streamMutexUnlock(&pInfo->checkInfoLock);

View File

@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamMetaRLock(pMeta); streamMetaRLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) { if (ppTask) {
// to make sure check status will not start the check downstream status when we start to check timerActive count.
streamMutexLock(&pTask->taskCheckInfo.checkInfoLock);
timerActive = (*ppTask)->status.timerActive; timerActive = (*ppTask)->status.timerActive;
streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock);
} }
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);

View File

@ -435,6 +435,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, "Table schema is old") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, "Table schema is old")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE, "Table already exists in other stables") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE, "Table already exists in other stables")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INCONSISTENT_DB_ID, "Inconsistent database id")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")

View File

@ -49,8 +49,7 @@ int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t
int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double number) { int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double number) {
if (NULL == cJSON_AddNumberToObject((cJSON*)pJson, pName, number)) { if (NULL == cJSON_AddNumberToObject((cJSON*)pJson, pName, number)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -58,8 +57,7 @@ int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double num
int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean) { int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean) {
if (NULL == cJSON_AddBoolToObject((cJSON*)pJson, pName, boolean)) { if (NULL == cJSON_AddBoolToObject((cJSON*)pJson, pName, boolean)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -67,8 +65,7 @@ int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean
int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) { int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) {
if (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal)) { if (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -87,8 +84,7 @@ int32_t tjsonAddItemToObject(SJson* pJson, const char* pName, SJson* pItem) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
} }
int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) { int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) {
@ -96,8 +92,7 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
} }
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj) { int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj) {
@ -106,18 +101,27 @@ int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void
} }
SJson* pJobj = tjsonCreateObject(); SJson* pJobj = tjsonCreateObject();
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) { if (NULL == pJobj) {
return terrno;
}
int32_t rc = func(pObj, pJobj);
if (rc != TSDB_CODE_SUCCESS) {
tjsonDelete(pJobj); tjsonDelete(pJobj);
return TSDB_CODE_FAILED; return rc;
} }
return tjsonAddItemToObject(pJson, pName, pJobj); return tjsonAddItemToObject(pJson, pName, pJobj);
} }
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) { int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
SJson* pJobj = tjsonCreateObject(); SJson* pJobj = tjsonCreateObject();
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) { if (pJobj == NULL) {
return terrno;
}
int32_t rc = func(pObj, pJobj);
if (rc != TSDB_CODE_SUCCESS) {
tjsonDelete(pJobj); tjsonDelete(pJobj);
return TSDB_CODE_FAILED; return rc;
} }
return tjsonAddItemToArray(pJson, pJobj); return tjsonAddItemToArray(pJson, pJobj);
} }
@ -156,9 +160,21 @@ int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); } char* tjsonToString(const SJson* pJson) {
char* p = cJSON_Print((cJSON*)pJson);
if (!p) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
return p;
}
char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); } char* tjsonToUnformattedString(const SJson* pJson) {
char* p = cJSON_PrintUnformatted((cJSON*)pJson);
if (!p) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
return p;
}
SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName) { return cJSON_GetObjectItem(pJson, pName); } SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName) { return cJSON_GetObjectItem(pJson, pName); }

View File

@ -55,6 +55,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/state_window_case.py
#,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4 #,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4
#,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4 #,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
@ -300,6 +301,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3581.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3581.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3311.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3311.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3821.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3821.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-5130.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py

View File

@ -538,21 +538,21 @@ class TDCom:
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
return cfgPath return cfgPath
def newcon(self,host='localhost',port=6030,user='root',password='taosdata'): def newcon(self,host='localhost',port=6030,user='root',password='taosdata', database=None):
con=taos.connect(host=host, user=user, password=password, port=port) con=taos.connect(host=host, user=user, password=password, port=port, database=database)
# print(con) # print(con)
return con return con
def newcur(self,host='localhost',port=6030,user='root',password='taosdata'): def newcur(self,host='localhost',port=6030,user='root',password='taosdata',database=None):
cfgPath = self.getClientCfgPath() cfgPath = self.getClientCfgPath()
con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port) con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port,database=database)
cur=con.cursor() cur=con.cursor()
# print(cur) # print(cur)
return cur return cur
def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'): def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata', database = None):
newTdSql = TDSql() newTdSql = TDSql()
cur = self.newcur(host=host,port=port,user=user,password=password) cur = self.newcur(host=host,port=port,user=user,password=password, database=database)
newTdSql.init(cur, False) newTdSql.init(cur, False)
return newTdSql return newTdSql

View File

@ -0,0 +1,60 @@
import sys
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag':135,}
def init(self, conn, logSql, replicaVar = 1):
self.replicaVar = replicaVar
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def init_case(self):
tdLog.debug("==========init case==========")
tdSql.execute("create database test")
tdSql.execute("use test")
tdSql.execute("CREATE STABLE `st_variable_data` (`load_time` TIMESTAMP, `collect_time` TIMESTAMP, `var_value` NCHAR(300)) TAGS (`factory_id` NCHAR(30), `device_code` NCHAR(80), `var_name` NCHAR(120), `var_type` NCHAR(30), `var_address` NCHAR(100), `var_attribute` NCHAR(30), `device_name` NCHAR(150), `var_desc` NCHAR(200), `trigger_value` NCHAR(50), `var_category` NCHAR(50), `var_category_desc` NCHAR(200));")
tdSql.execute('CREATE TABLE aaa using `st_variable_data` tags("a1","a2", "a3","a4","a5","a6","a7","a8","a9","a10","a11")')
time.sleep(2)
def create_stream(self):
tdLog.debug("==========create stream==========")
tdSql.execute("CREATE STREAM stream_device_alarm TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\
as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, (case when lower(var_value)=lower(trigger_value) then '1' else '0' end) state_flag from st_variable_data\
PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)")
time.sleep(2)
tdSql.execute("CREATE STREAM stream_device_alarm2 TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm2 tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\
as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, 1 state_flag from st_variable_data\
PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)")
time.sleep(2)
def insert_data(self):
try:
tdSql.execute("insert into aaa values('2024-07-15 14:00:00', '2024-07-15 14:00:00', 'a8')", queryTimes=5, show=True)
time.sleep(0.01)
tdSql.execute("insert into aaa values('2024-07-15 14:10:00', '2024-07-15 14:10:00', 'a9')", queryTimes=5, show=True)
time.sleep(1)
except Exception as error:
tdLog.exit(f"insert data failed {error}")
def run(self):
self.init_case()
self.create_stream()
self.insert_data()
tdSql.query("select state_flag from st_device_alarm")
tdSql.checkData(0, 0, 0, show=True)
tdSql.checkData(1, 0, 1, show=True)
tdSql.query("select state_flag from st_device_alarm2")
tdSql.checkData(0, 0, 1, show=True)
tdSql.checkData(1, 0, 1, show=True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,45 @@
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
import taos
class TDTestCase:
def init(self, conn, logSQl, replicaVal=1):
self.replicaVar = int(replicaVal)
tdLog.debug(f"start to excute {__file__}")
self.conn = conn
tdSql.init(conn.cursor(), False)
self.passwd = {'root':'taosdata',
'test':'test'}
def prepare_user(self):
tdSql.execute(f"create user test pass 'test' sysinfo 1")
def test_connect_user(self, uname):
try:
for db in ['information_schema', 'performance_schema']:
new_tdsql = tdCom.newTdSql(user=uname, password=self.passwd[uname], database=db)
new_tdsql.query('show databases')
new_tdsql.checkData(0, 0, 'information_schema')
new_tdsql.checkData(1, 0, 'performance_schema')
tdLog.success(f"Test User {uname} for {db} .......[OK]")
except:
tdLog.exit(f'{__file__} failed')
def run(self):
self.prepare_user()
self.test_connect_user('root')
self.test_connect_user('test')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -171,7 +171,7 @@ int32_t init_env() {
} }
// pass NULL return last error code describe // pass NULL return last error code describe
const char* err = taos_errstr(NULL); const char* err = tmq_err2str(error_code);
printf("write_raw_block return code =0x%x err=%s\n", error_code, err); printf("write_raw_block return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) { if(strcmp(err, "success") == 0) {
printf("expect failed , but error string is success! err=%s\n", err); printf("expect failed , but error string is success! err=%s\n", err);
@ -185,7 +185,7 @@ int32_t init_env() {
goto END; goto END;
} }
err = taos_errstr(NULL); err = tmq_err2str(error_code);
printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err); printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err);
if(strcmp(err, "success") == 0) { if(strcmp(err, "success") == 0) {
printf("expect failed write no exist table, but error string is success! err=%s\n", err); printf("expect failed write no exist table, but error string is success! err=%s\n", err);