Merge remote-tracking branch 'origin/3.0' into fix/td_19618

This commit is contained in:
Shengliang Guan 2022-10-22 09:03:43 +08:00
commit c58a77b516
33 changed files with 245 additions and 105 deletions

View File

@ -892,6 +892,7 @@ typedef struct {
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfStables; int32_t numOfStables;
int32_t buffer; int32_t buffer;
int32_t cacheSize;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t daysPerFile; int32_t daysPerFile;

View File

@ -84,7 +84,7 @@ typedef struct SOutputData {
* @param pHandle output * @param pHandle output
* @return error code * @return error code
*/ */
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);

View File

@ -34,7 +34,7 @@ extern "C" {
#define SHOW_CREATE_TB_RESULT_COLS 2 #define SHOW_CREATE_TB_RESULT_COLS 2
#define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) #define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE)
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE) #define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_ALLOWED_SQL_LEN * 3)
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 2 #define SHOW_LOCAL_VARIABLES_RESULT_COLS 2
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)

View File

@ -338,7 +338,7 @@ void doDestroyRequest(void *p) {
SRequestObj *pRequest = (SRequestObj *)p; SRequestObj *pRequest = (SRequestObj *)p;
int64_t reqId = pRequest->self; uint64_t reqId = pRequest->requestId;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));

View File

@ -893,16 +893,26 @@ void tTagFree(STag *pTag) {
} }
char *tTagValToData(const STagVal *value, bool isJson) { char *tTagValToData(const STagVal *value, bool isJson) {
if (!value) return NULL; if (!value) {
return NULL;
}
char *data = NULL; char *data = NULL;
int8_t typeBytes = 0; int8_t typeBytes = 0;
if (isJson) { if (isJson) {
typeBytes = CHAR_BYTES; typeBytes = CHAR_BYTES;
} }
if (IS_VAR_DATA_TYPE(value->type)) { if (IS_VAR_DATA_TYPE(value->type)) {
data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData); data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData);
if (data == NULL) return NULL; if (data == NULL) {
if (isJson) *data = value->type; return NULL;
}
if (isJson) {
*data = value->type;
}
varDataLen(data + typeBytes) = value->nData; varDataLen(data + typeBytes) = value->nData;
memcpy(varDataVal(data + typeBytes), value->pData, value->nData); memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
} else { } else {

View File

@ -49,7 +49,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2; int32_t tsNumOfVnodeStreamThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
@ -364,8 +364,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
// tsNumOfVnodeFetchThreads = 1; tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
// if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
@ -487,7 +488,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/*
pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeFetchThreads = numOfCores / 4; tsNumOfVnodeFetchThreads = numOfCores / 4;
@ -495,7 +495,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->i32 = tsNumOfVnodeFetchThreads; pItem->i32 = tsNumOfVnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/
pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads"); pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
@ -688,7 +687,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32; tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
// tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;

View File

@ -2765,6 +2765,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1; if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1; if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->cacheSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pRsp->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->pages) < 0) return -1; if (tEncodeI32(&encoder, pRsp->pages) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1; if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
@ -2804,6 +2805,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->cacheSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->pages) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;

View File

@ -848,6 +848,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
cfgRsp.numOfStables = pDb->cfg.numOfStables; cfgRsp.numOfStables = pDb->cfg.numOfStables;
cfgRsp.buffer = pDb->cfg.buffer; cfgRsp.buffer = pDb->cfg.buffer;
cfgRsp.cacheSize = pDb->cfg.cacheLastSize;
cfgRsp.pageSize = pDb->cfg.pageSize; cfgRsp.pageSize = pDb->cfg.pageSize;
cfgRsp.pages = pDb->cfg.pages; cfgRsp.pages = pDb->cfg.pages;
cfgRsp.daysPerFile = pDb->cfg.daysPerFile; cfgRsp.daysPerFile = pDb->cfg.daysPerFile;

View File

@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
ASSERT(0); mError("failed to prepare trans rebalance since %s", terrstr());
goto REB_FAIL; goto REB_FAIL;
} }

View File

@ -167,7 +167,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if (rollback) { if (rollback) {
ASSERT(0); ASSERT(0);
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
} }

View File

@ -1523,9 +1523,9 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pMemSchema; return pReader->pMemSchema;
} }
taosMemoryFree(pReader->pMemSchema); taosMemoryFreeClear(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
terrno = code; terrno = code;
return NULL; return NULL;
} else { } else {
@ -2274,7 +2274,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
_end: _end:
pResBlock->info.uid = pBlockScanInfo->uid; pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0;
blockDataUpdateTsWindow(pResBlock, 0); blockDataUpdateTsWindow(pResBlock, 0);
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
@ -2569,7 +2569,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} }
if (pScanInfo == NULL) { if (pScanInfo == NULL) {
tsdbError("failed to get table, uid:%" PRIu64 ", %s", pBlockInfo->uid, pReader->idStr); tsdbError("failed to get table scan-info, %s", pReader->idStr);
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
return code; return code;
} }

View File

@ -268,10 +268,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
len += sprintf( len += sprintf(
buf2 + VARSTR_HEADER_SIZE, buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL '%s' COMP %d DURATION %dm " "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d", "STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod, dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups, pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables); 1 == pCfg->numOfStables);
@ -496,7 +496,12 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
colDataAppend(pCol1, 0, buf1, false); colDataAppend(pCol1, 0, buf1, false);
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
char buf2[SHOW_CREATE_TB_RESULT_FIELD2_LEN] = {0}; char* buf2 = taosMemoryMalloc(SHOW_CREATE_TB_RESULT_FIELD2_LEN);
if (NULL == buf2) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return terrno;
}
int32_t len = 0; int32_t len = 0;
if (TSDB_SUPER_TABLE == pCfg->tableType) { if (TSDB_SUPER_TABLE == pCfg->tableType) {
@ -512,6 +517,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS ("); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
code = appendTagValues(buf2, &len, pCfg); code = appendTagValues(buf2, &len, pCfg);
if (code) { if (code) {
taosMemoryFree(buf2);
return code; return code;
} }
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")"); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
@ -527,6 +533,8 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
colDataAppend(pCol2, 0, buf2, false); colDataAppend(pCol2, 0, buf2, false);
taosMemoryFree(buf2);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -254,10 +254,12 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam) { void* pParam) {
int32_t code = TSDB_CODE_SUCCESS;
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
if (NULL == deleter) { if (NULL == deleter) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY; goto _end;
} }
SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink; SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
@ -270,17 +272,30 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
deleter->pManager = pManager; deleter->pManager = pManager;
deleter->pDeleter = pDeleterNode; deleter->pDeleter = pDeleterNode;
deleter->pSchema = pDataSink->pInputDataBlockDesc; deleter->pSchema = pDataSink->pInputDataBlockDesc;
if(pParam == NULL) {
code = TSDB_CODE_QRY_INVALID_INPUT;
qError("invalid input param in creating data deleter, code%s", tstrerror(code));
goto _end;
}
deleter->pParam = pParam; deleter->pParam = pParam;
deleter->status = DS_BUF_EMPTY; deleter->status = DS_BUF_EMPTY;
deleter->queryEnd = false; deleter->queryEnd = false;
deleter->pDataBlocks = taosOpenQueue(); deleter->pDataBlocks = taosOpenQueue();
taosThreadMutexInit(&deleter->mutex, NULL); taosThreadMutexInit(&deleter->mutex, NULL);
if (NULL == deleter->pDataBlocks) { if (NULL == deleter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
*pHandle = deleter;
return code;
_end:
if (deleter != NULL) {
destroyDataSinker((SDataSinkHandle*)deleter); destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter); taosMemoryFree(deleter);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
*pHandle = deleter; return code;
return TSDB_CODE_SUCCESS;
} }

View File

@ -231,9 +231,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
if (pBuf != NULL) {
taosMemoryFreeClear(pBuf->pData); taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
} }
}
taosCloseQueue(pDispatcher->pDataBlocks); taosCloseQueue(pDispatcher->pDataBlocks);
taosThreadMutexDestroy(&pDispatcher->mutex); taosThreadMutexDestroy(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -33,7 +33,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
return 0; return 0;
} }
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
switch ((int)nodeType(pDataSink)) { switch ((int)nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
@ -42,7 +42,9 @@ int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHand
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
} }
return TSDB_CODE_FAILED;
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {

View File

@ -370,7 +370,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error; goto _error;
} }
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pSinkParam); taosMemoryFreeClear(pSinkParam);
} }

View File

@ -3101,7 +3101,9 @@ _error:
destroyAggOperatorInfo(pInfo); destroyAggOperatorInfo(pInfo);
} }
cleanupExprSupp(&pOperator->exprSupp);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }

View File

@ -421,14 +421,14 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
goto _error; goto _error;
} }
int32_t num = 0; initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -453,7 +453,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
if (pInfo != NULL) {
destroyGroupOperatorInfo(pInfo); destroyGroupOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }

View File

@ -465,16 +465,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
taosMemoryFree(data);
}
} else { // todo opt for json tag } else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, false); colDataAppend(pColInfoData, i, data, false);
} }
} }
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
taosMemoryFree(data);
}
} }
} }

View File

@ -263,29 +263,14 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) { static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType; int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
if (type == QUERY_NODE_COLUMN) { if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex); bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex); char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} else if (type == QUERY_NODE_OPERATOR) {
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
} else if (type == QUERY_NODE_FUNCTION) {
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull); saveColData(pRow, i, p, isNull);
} else { } else {
ASSERT(0); ASSERT(0);

View File

@ -4411,6 +4411,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
taosArrayPush(pInfo->pChildren, &pChildOp); taosArrayPush(pInfo->pChildren, &pChildOp);
} }
} }
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
return pOperator; return pOperator;
_error: _error:

View File

@ -330,7 +330,7 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
if (NULL == pFunc) { if (NULL == pFunc) {
return NULL; return NULL;
} }
strcpy(pFunc->functionName, pName); snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pName);
pFunc->pParameterList = pParameterList; pFunc->pParameterList = pParameterList;
if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) { if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
pFunc->pParameterList = NULL; pFunc->pParameterList = NULL;
@ -408,10 +408,6 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pMergeFunc = pFunc; *pMergeFunc = pFunc;
} else { } else {
if (NULL != pFunc) {
pFunc->pParameterList = NULL;
nodesDestroyNode((SNode*)pFunc);
}
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
} }

View File

@ -96,7 +96,9 @@ double findOnlyResult(tMemBucket *pMemBucket) {
} }
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
SArray *list = *(SArray **)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); SArray **pList = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
if (pList != NULL) {
SArray *list = *pList;
assert(list->size == 1); assert(list->size == 1);
int32_t *pageId = taosArrayGet(list, 0); int32_t *pageId = taosArrayGet(list, 0);
@ -107,6 +109,7 @@ double findOnlyResult(tMemBucket *pMemBucket) {
GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data); GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data);
return v; return v;
} }
}
return 0; return 0;
} }

View File

@ -190,28 +190,20 @@ int32_t nodesReleaseAllocator(int64_t allocatorId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId); if (NULL == g_pNodeAllocator) {
if (NULL == pAllocator) {
return terrno;
}
int32_t code = taosThreadMutexTryLock(&pAllocator->mutex);
if (EBUSY != code) {
nodesError("allocator id %" PRIx64 nodesError("allocator id %" PRIx64
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator " " release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
"function is called!", "function is called!",
allocatorId); allocatorId);
if (0 == code) {
taosThreadMutexUnlock(&pAllocator->mutex);
}
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SNodeAllocator* pAllocator = g_pNodeAllocator;
g_pNodeAllocator = NULL; g_pNodeAllocator = NULL;
taosThreadMutexUnlock(&pAllocator->mutex); taosThreadMutexUnlock(&pAllocator->mutex);
return taosReleaseRef(g_allocatorReqRefPool, allocatorId); return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
} }
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) { int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
if (allocatorId <= 0) { if (allocatorId <= 0) {
return 0; return 0;

View File

@ -529,7 +529,7 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
} }
if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) { if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize); qError("tDeserializeSDbCfgRsp failed, msgSize:%d,dbCfgRsp:%lu", msgSize, sizeof(out));
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }

View File

@ -175,11 +175,15 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr
maxTs = TMAX(maxTs, ts); maxTs = TMAX(maxTs, ts);
SScalableBf *pSBf = getSBf(pInfo, ts); SScalableBf *pSBf = getSBf(pInfo, ts);
if (pSBf) { if (pSBf) {
tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); SUpdateKey updateKey = {
.tbUid = tbUid,
.ts = ts,
};
tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
} }
} }
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pMaxTs == NULL || *pMaxTs > tbUid) { if (pMaxTs == NULL || *pMaxTs > maxTs) {
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
} }
} }

View File

@ -43,7 +43,7 @@ void* rpcOpen(const SRpcInit* pInit) {
return NULL; return NULL;
} }
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN); tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
} }
pRpc->compressSize = pInit->compressSize; pRpc->compressSize = pInit->compressSize;
@ -75,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) {
} }
pRpc->parent = pInit->parent; pRpc->parent = pInit->parent;
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN); tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
} }
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
@ -87,7 +87,7 @@ void rpcClose(void* arg) {
tInfo("start to close rpc"); tInfo("start to close rpc");
transRemoveExHandle(transGetInstMgt(), (int64_t)arg); transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
transReleaseExHandle(transGetInstMgt(), (int64_t)arg); transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("rpc is closed"); tInfo("end to close rpc");
return; return;
} }
void rpcCloseImpl(void* arg) { void rpcCloseImpl(void* arg) {

View File

@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
int32_t walRefVer(SWalRef *pRef, int64_t ver) { int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal; SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) { if (pRef->refVer != ver) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {

View File

@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting; int64_t ver = pWal->vers.verInSnapshotting;
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
if (ver == -1) { if (ver == -1) {
code = -1; code = -1;
goto END; goto END;
@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) {
if (pIter == NULL) break; if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter; SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue; if (pRef->refVer == -1) continue;
ver = TMIN(ver, pRef->refVer); ver = TMIN(ver, pRef->refVer - 1);
wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
} }
int deleteCnt = 0; int deleteCnt = 0;
@ -298,7 +305,12 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (pInfo) { if (pInfo) {
if (ver >= pInfo->lastVer) { if (ver >= pInfo->lastVer) {
pInfo++; pInfo--;
}
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
} else {
wDebug("vgId:%d, no remove", pWal->cfg.vgId);
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
@ -315,10 +327,12 @@ int32_t walEndSnapshot(SWal *pWal) {
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i); pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META; goto UPDATE_META;
} }
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0); ASSERT(0);
} }

View File

@ -137,8 +137,10 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
} }
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
pBF->hashFn1 = taosFastHash;
pBF->hashFn2 = taosDJB2Hash;
return pBF; return pBF;
_error: _error:

View File

@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
print =============== create database print =============== create database
sql create database test vgroups 1 sql create database test vgroups 1;
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases
if $rows != 3 then if $rows != 3 then
return -1 return -1
@ -29,4 +29,100 @@ if $rows != 0 then
return -1 return -1
endi endi
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp, a int, b int) tags(t int);
sql create table t1 using st tags(1);
sql create table t2 using st tags(2);
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s);
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
sql insert into t1 values(1648791213000,1,1);
sql insert into t1 values(1648791213001,2,1);
sql insert into t1 values(1648791213002,3,1);
sql insert into t1 values(1648791233000,4,2);
$loop_count = 0
loop1:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
if $rows != 1 then
print ======streamt2=$rows
return -1
endi
sql select * from streamt3;
if $rows != 2 then
print ======streamt3=$rows
goto loop1
endi
sql select * from streamt4;
if $rows != 1 then
print ======streamt4=$rows
return -1
endi
sql select * from streamt5;
if $rows != 2 then
print ======streamt5=$rows
goto loop1
endi
sql select * from streamt6;
if $rows != 1 then
print ======streamt6=$rows
return -1
endi
sql select * from streamt7;
if $rows != 2 then
print ======streamt7=$rows
goto loop1
endi
sql select * from streamt8;
if $rows != 1 then
print ======streamt8=$rows
return -1
endi
sql select * from streamt9;
if $rows != 2 then
print ======streamt9=$rows
goto loop1
endi
sql select * from streamt10;
if $rows != 1 then
print ======streamt10=$rows
return -1
endi
sql select * from streamt11;
if $rows != 2 then
print ======streamt11=$rows
goto loop1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -81,6 +81,7 @@ class TDTestCase:
dbname = "test" dbname = "test"
stb = f"{dbname}.meters" stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath) self.installTaosd(bPath,cPath)
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
tableNumbers=100 tableNumbers=100
recordNumbers1=100 recordNumbers1=100
recordNumbers2=1000 recordNumbers2=1000
@ -96,8 +97,8 @@ class TDTestCase:
tdLog.info(f"Base client version is {oldClientVersion}") tdLog.info(f"Base client version is {oldClientVersion}")
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}") tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}")
tdLog.info(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
sleep(3) sleep(3)
# tdsqlF.query(f"select count(*) from {stb}") # tdsqlF.query(f"select count(*) from {stb}")

1
tests/system-test/fulltest.sh Executable file → Normal file
View File

@ -610,4 +610,3 @@ python3 ./test.py -f 2-query/last_row.py -Q 4
python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
#python3 ./test.py -f 2-query/sml.py -Q 4 #python3 ./test.py -f 2-query/sml.py -Q 4
python3 ./test.py -f 2-query/interp.py -Q 4 python3 ./test.py -f 2-query/interp.py -Q 4