diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dbca38526a..c58cc3c7c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -892,6 +892,7 @@ typedef struct { int32_t numOfVgroups; int32_t numOfStables; int32_t buffer; + int32_t cacheSize; int32_t pageSize; int32_t pages; int32_t daysPerFile; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 816a53ad96..8a02f372d1 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -84,7 +84,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); +int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 0752512951..ae550e0c08 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -34,7 +34,7 @@ extern "C" { #define SHOW_CREATE_TB_RESULT_COLS 2 #define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) -#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE) +#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_ALLOWED_SQL_LEN * 3) #define SHOW_LOCAL_VARIABLES_RESULT_COLS 2 #define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b3c5565cfb..be0e6d50dc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -338,7 +338,7 @@ void doDestroyRequest(void *p) { SRequestObj *pRequest = (SRequestObj *)p; - int64_t reqId = pRequest->self; + uint64_t reqId = pRequest->requestId; tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index a11c67c1fd..513c54c7e9 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -893,16 +893,26 @@ void tTagFree(STag *pTag) { } char *tTagValToData(const STagVal *value, bool isJson) { - if (!value) return NULL; + if (!value) { + return NULL; + } + char *data = NULL; int8_t typeBytes = 0; if (isJson) { typeBytes = CHAR_BYTES; } + if (IS_VAR_DATA_TYPE(value->type)) { data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData); - if (data == NULL) return NULL; - if (isJson) *data = value->type; + if (data == NULL) { + return NULL; + } + + if (isJson) { + *data = value->type; + } + varDataLen(data + typeBytes) = value->nData; memcpy(varDataVal(data + typeBytes), value->pData, value->nData); } else { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3edeeee49a..5025cbcac5 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -49,7 +49,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 1; +int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; @@ -364,8 +364,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; - // tsNumOfVnodeFetchThreads = 1; - // if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1; + tsNumOfVnodeFetchThreads = tsNumOfCores / 4; + tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); @@ -487,15 +488,13 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } - /* - pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfVnodeFetchThreads = numOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - pItem->i32 = tsNumOfVnodeFetchThreads; - pItem->stype = stype; - } - */ + pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfVnodeFetchThreads = numOfCores / 4; + tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); + pItem->i32 = tsNumOfVnodeFetchThreads; + pItem->stype = stype; + } pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -688,7 +687,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32; - // tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; + tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 81cdd0e5e0..4fd1187fef 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2765,6 +2765,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1; if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1; if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->cacheSize) < 0) return -1; if (tEncodeI32(&encoder, pRsp->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pRsp->pages) < 0) return -1; if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1; @@ -2804,6 +2805,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) { if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->cacheSize) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->pages) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 13990f4ba7..1fa886b722 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -848,6 +848,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) { cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; cfgRsp.numOfStables = pDb->cfg.numOfStables; cfgRsp.buffer = pDb->cfg.buffer; + cfgRsp.cacheSize = pDb->cfg.cacheLastSize; cfgRsp.pageSize = pDb->cfg.pageSize; cfgRsp.pages = pDb->cfg.pages; cfgRsp.daysPerFile = pDb->cfg.daysPerFile; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 739b8bbf01..eea79c5335 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) { - ASSERT(0); + mError("failed to prepare trans rebalance since %s", terrstr()); goto REB_FAIL; } diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b4a7ce7737..92e5f8df7a 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -167,7 +167,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { if (rollback) { ASSERT(0); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 11cae00358..88bbf67758 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1523,9 +1523,9 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pMemSchema; } - taosMemoryFree(pReader->pMemSchema); + taosMemoryFreeClear(pReader->pMemSchema); int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) { terrno = code; return NULL; } else { @@ -2274,7 +2274,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } _end: - pResBlock->info.uid = pBlockScanInfo->uid; + pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); @@ -2569,7 +2569,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo == NULL) { - tsdbError("failed to get table, uid:%" PRIu64 ", %s", pBlockInfo->uid, pReader->idStr); + tsdbError("failed to get table scan-info, %s", pReader->idStr); code = TSDB_CODE_INVALID_PARA; return code; } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 095d2b093d..47a904bba2 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -268,10 +268,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S len += sprintf( buf2 + VARSTR_HEADER_SIZE, - "CREATE DATABASE `%s` BUFFER %d CACHEMODEL '%s' COMP %d DURATION %dm " + "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d", - dbFName, pCfg->buffer, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod, + dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups, 1 == pCfg->numOfStables); @@ -496,7 +496,12 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p colDataAppend(pCol1, 0, buf1, false); SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1); - char buf2[SHOW_CREATE_TB_RESULT_FIELD2_LEN] = {0}; + char* buf2 = taosMemoryMalloc(SHOW_CREATE_TB_RESULT_FIELD2_LEN); + if (NULL == buf2) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return terrno; + } + int32_t len = 0; if (TSDB_SUPER_TABLE == pCfg->tableType) { @@ -512,6 +517,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS ("); code = appendTagValues(buf2, &len, pCfg); if (code) { + taosMemoryFree(buf2); return code; } len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")"); @@ -527,6 +533,8 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p colDataAppend(pCol2, 0, buf2, false); + taosMemoryFree(buf2); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 0714d0f3ac..2ed83a6469 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -254,10 +254,12 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { + int32_t code = TSDB_CODE_SUCCESS; + SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; } SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink; @@ -270,17 +272,30 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pManager = pManager; deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; + + if(pParam == NULL) { + code = TSDB_CODE_QRY_INVALID_INPUT; + qError("invalid input param in creating data deleter, code%s", tstrerror(code)); + goto _end; + } + deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; deleter->queryEnd = false; deleter->pDataBlocks = taosOpenQueue(); taosThreadMutexInit(&deleter->mutex, NULL); if (NULL == deleter->pDataBlocks) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + *pHandle = deleter; + return code; + + _end: + if (deleter != NULL) { destroyDataSinker((SDataSinkHandle*)deleter); taosMemoryFree(deleter); - return TSDB_CODE_QRY_OUT_OF_MEMORY; } - *pHandle = deleter; - return TSDB_CODE_SUCCESS; + return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index ffdcf48d48..b758e4b1dd 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -231,8 +231,10 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); - taosMemoryFreeClear(pBuf->pData); - taosFreeQitem(pBuf); + if (pBuf != NULL) { + taosMemoryFreeClear(pBuf->pData); + taosFreeQitem(pBuf); + } } taosCloseQueue(pDispatcher->pDataBlocks); taosThreadMutexDestroy(&pDispatcher->mutex); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 206f3719fa..2b50be33ad 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -33,7 +33,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return 0; } -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { +int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { switch ((int)nodeType(pDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); @@ -42,7 +42,9 @@ int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHand case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); } - return TSDB_CODE_FAILED; + + qError("invalid input node type:%d, %s", nodeType(pDataSink), id); + return TSDB_CODE_QRY_INVALID_INPUT; } int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cad3e3c44c..fb4248e886 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -370,7 +370,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } - code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); + code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pSinkParam); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1d99a32cf9..c9711f13dd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3101,7 +3101,9 @@ _error: destroyAggOperatorInfo(pInfo); } + cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 53acf31330..60794fc22a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -421,14 +421,14 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -453,7 +453,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - destroyGroupOperatorInfo(pInfo); + if (pInfo != NULL) { + destroyGroupOperatorInfo(pInfo); + } taosMemoryFreeClear(pOperator); return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fa71e8efa6..4e1b07e662 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -465,16 +465,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); + if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { + taosMemoryFree(data); + } } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, false); } } - - if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && - IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { - taosMemoryFree(data); - } } } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7d54d243ef..d1d22dc3e5 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -263,29 +263,14 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType; - if (type == QUERY_NODE_COLUMN) { + if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) { int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); bool isNull = colDataIsNull_s(pSrcCol, rowIndex); char* p = colDataGetData(pSrcCol, rowIndex); - saveColData(pRow, i, p, isNull); - } else if (type == QUERY_NODE_OPERATOR) { - int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); - SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); - - bool isNull = colDataIsNull_s(pSrcCol, rowIndex); - char* p = colDataGetData(pSrcCol, rowIndex); - saveColData(pRow, i, p, isNull); - } else if (type == QUERY_NODE_FUNCTION) { - int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); - - SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); - - bool isNull = colDataIsNull_s(pSrcCol, rowIndex); - char* p = colDataGetData(pSrcCol, rowIndex); saveColData(pRow, i, p, isNull); } else { ASSERT(0); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 13ec8505fd..e89cd5e537 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4411,6 +4411,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream taosArrayPush(pInfo->pChildren, &pChildOp); } } + + if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { + pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; + } + return pOperator; _error: diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 40af7bb567..fde9084ae3 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -330,7 +330,7 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis if (NULL == pFunc) { return NULL; } - strcpy(pFunc->functionName, pName); + snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pName); pFunc->pParameterList = pParameterList; if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) { pFunc->pParameterList = NULL; @@ -408,10 +408,6 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio if (TSDB_CODE_SUCCESS == code) { *pMergeFunc = pFunc; } else { - if (NULL != pFunc) { - pFunc->pParameterList = NULL; - nodesDestroyNode((SNode*)pFunc); - } nodesDestroyList(pParameterList); } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 62c5e4b28b..6a1427c63f 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -96,16 +96,19 @@ double findOnlyResult(tMemBucket *pMemBucket) { } int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); - SArray *list = *(SArray **)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); - assert(list->size == 1); + SArray **pList = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); + if (pList != NULL) { + SArray *list = *pList; + assert(list->size == 1); - int32_t *pageId = taosArrayGet(list, 0); - SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); - assert(pPage->num == 1); + int32_t *pageId = taosArrayGet(list, 0); + SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); + assert(pPage->num == 1); - double v = 0; - GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data); - return v; + double v = 0; + GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data); + return v; + } } return 0; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e647438800..aec4d0148a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -190,28 +190,20 @@ int32_t nodesReleaseAllocator(int64_t allocatorId) { return TSDB_CODE_SUCCESS; } - SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId); - if (NULL == pAllocator) { - return terrno; - } - - int32_t code = taosThreadMutexTryLock(&pAllocator->mutex); - if (EBUSY != code) { + if (NULL == g_pNodeAllocator) { nodesError("allocator id %" PRIx64 " release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator " "function is called!", allocatorId); - if (0 == code) { - taosThreadMutexUnlock(&pAllocator->mutex); - } return TSDB_CODE_FAILED; } - + SNodeAllocator* pAllocator = g_pNodeAllocator; g_pNodeAllocator = NULL; taosThreadMutexUnlock(&pAllocator->mutex); return taosReleaseRef(g_allocatorReqRefPool, allocatorId); } + int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) { if (allocatorId <= 0) { return 0; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index fadd07a9f3..953abd5956 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -529,7 +529,7 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { } if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) { - qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize); + qError("tDeserializeSDbCfgRsp failed, msgSize:%d,dbCfgRsp:%lu", msgSize, sizeof(out)); return TSDB_CODE_INVALID_MSG; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 80410568e5..199892c241 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -175,11 +175,15 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr maxTs = TMAX(maxTs, ts); SScalableBf *pSBf = getSBf(pInfo, ts); if (pSBf) { - tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + SUpdateKey updateKey = { + .tbUid = tbUid, + .ts = ts, + }; + tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); } } TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); - if (pMaxTs == NULL || *pMaxTs > tbUid) { + if (pMaxTs == NULL || *pMaxTs > maxTs) { taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); } } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f784b4c53e..b649b1e6c1 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -43,7 +43,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN); + tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); } pRpc->compressSize = pInit->compressSize; @@ -75,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->parent = pInit->parent; if (pInit->user) { - memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN); + tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); } int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); @@ -87,7 +87,7 @@ void rpcClose(void* arg) { tInfo("start to close rpc"); transRemoveExHandle(transGetInstMgt(), (int64_t)arg); transReleaseExHandle(transGetInstMgt(), (int64_t)arg); - tInfo("rpc is closed"); + tInfo("end to close rpc"); return; } void rpcCloseImpl(void* arg) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 2c45fbbdaf..119d0575d8 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; + wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { taosThreadMutexLock(&pWal->mutex); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d9aedff577..527ffa0056 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { pWal->vers.verInSnapshotting = ver; + wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, + pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { taosThreadMutexLock(&pWal->mutex); @@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) { int32_t code = 0; taosThreadMutexLock(&pWal->mutex); int64_t ver = pWal->vers.verInSnapshotting; + + wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, + ver, pWal->vers.firstVer, pWal->vers.lastVer); + if (ver == -1) { code = -1; goto END; @@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) { if (pIter == NULL) break; SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; - ver = TMIN(ver, pRef->refVer); + ver = TMIN(ver, pRef->refVer - 1); + wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); } int deleteCnt = 0; @@ -298,7 +305,12 @@ int32_t walEndSnapshot(SWal *pWal) { SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); if (pInfo) { if (ver >= pInfo->lastVer) { - pInfo++; + pInfo--; + } + if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { + wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); + } else { + wDebug("vgId:%d, no remove", pWal->cfg.vgId); } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { @@ -315,10 +327,12 @@ int32_t walEndSnapshot(SWal *pWal) { for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); + wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { goto UPDATE_META; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { ASSERT(0); } @@ -409,7 +423,7 @@ END: } static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { - SWalIdxEntry entry = {.ver = ver, .offset = offset}; + SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); ASSERT(pFileInfo != NULL); ASSERT(pFileInfo->firstVer >= 0); @@ -429,7 +443,7 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { if (endOffset < 0) { wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver); } - ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); + ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); return 0; } @@ -437,7 +451,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy const void *body, int32_t bodyLen) { int64_t code = 0; - int64_t offset = walGetCurFileOffset(pWal); + int64_t offset = walGetCurFileOffset(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); ASSERT(pFileInfo != NULL); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index b9c96dd606..84a78f3477 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -137,8 +137,10 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) { if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; } if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; - pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ + /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ + pBF->hashFn1 = taosFastHash; + pBF->hashFn2 = taosDJB2Hash; return pBF; _error: diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 0d435a9fbd..9fcdcfb959 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 +sql create database test vgroups 1; sql select * from information_schema.ins_databases if $rows != 3 then return -1 @@ -29,4 +29,100 @@ if $rows != 0 then return -1 endi + +sql create database test1 vgroups 4; +sql use test1; +sql create stable st(ts timestamp, a int, b int) tags(t int); +sql create table t1 using st tags(1); +sql create table t2 using st tags(2); + +sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s); +sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s); +sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s); +sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s); +sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s); +sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s); +sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s); +sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s); +sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b); +sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b); + +sql insert into t1 values(1648791213000,1,1); +sql insert into t1 values(1648791213001,2,1); +sql insert into t1 values(1648791213002,3,1); + +sql insert into t1 values(1648791233000,4,2); + +$loop_count = 0 + +loop1: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; + +if $rows != 1 then + print ======streamt2=$rows + return -1 +endi + +sql select * from streamt3; +if $rows != 2 then + print ======streamt3=$rows + goto loop1 +endi + +sql select * from streamt4; +if $rows != 1 then + print ======streamt4=$rows + return -1 +endi + +sql select * from streamt5; +if $rows != 2 then + print ======streamt5=$rows + goto loop1 +endi + +sql select * from streamt6; +if $rows != 1 then + print ======streamt6=$rows + return -1 +endi + +sql select * from streamt7; +if $rows != 2 then + print ======streamt7=$rows + goto loop1 +endi + +sql select * from streamt8; +if $rows != 1 then + print ======streamt8=$rows + return -1 +endi + +sql select * from streamt9; +if $rows != 2 then + print ======streamt9=$rows + goto loop1 +endi + +sql select * from streamt10; +if $rows != 1 then + print ======streamt10=$rows + return -1 +endi + +sql select * from streamt11; +if $rows != 2 then + print ======streamt11=$rows + goto loop1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 25b023bb76..e81579a9e4 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -81,6 +81,7 @@ class TDTestCase: dbname = "test" stb = f"{dbname}.meters" self.installTaosd(bPath,cPath) + os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ") tableNumbers=100 recordNumbers1=100 recordNumbers2=1000 @@ -96,8 +97,8 @@ class TDTestCase: tdLog.info(f"Base client version is {oldClientVersion}") tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}") - tdLog.info(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") - os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") + tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") + os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") sleep(3) # tdsqlF.query(f"select count(*) from {stb}") diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh old mode 100755 new mode 100644 index 3d6c2ecb94..a89b41cac6 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -610,4 +610,3 @@ python3 ./test.py -f 2-query/last_row.py -Q 4 python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 #python3 ./test.py -f 2-query/sml.py -Q 4 python3 ./test.py -f 2-query/interp.py -Q 4 -