diff --git a/docs/zh/14-reference/03-taos-sql/03-table.md b/docs/zh/14-reference/03-taos-sql/03-table.md index ca2170db8b..cf29ef1451 100644 --- a/docs/zh/14-reference/03-taos-sql/03-table.md +++ b/docs/zh/14-reference/03-taos-sql/03-table.md @@ -6,7 +6,7 @@ description: 对表的各种管理操作 ## 创建表 -`CREATE TABLE` 语句用于创建普通表和以超级表为模板创建子表。 +`CREATE TABLE` 语句用于创建普通表和以超级表为模板创建子表(也可以通过指定 TAGS 字段创建超级表)。 ```sql CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definition] ...) [table_options] diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 0b34e882c8..292e7f561a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -182,7 +182,7 @@ void tColDataClear(SColData *pColData); void tColDataDeepClear(SColData *pColData); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); -void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); +int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index acd118acc9..d48142811c 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -848,6 +848,7 @@ static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) { STMT_DLOG("start to set dbName: %s", dbName); pStmt->db = taosStrdup(dbName); + (void)strdequote(pStmt->db); STMT_ERR_RET(stmtCreateRequest(pStmt)); // The SQL statement specifies a database name, overriding the previously specified database diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f1aacfed15..3efccf23b4 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2671,8 +2671,13 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal tColDataGetValue6, // HAS_VALUE | HAS_NULL tColDataGetValue7 // HAS_VALUE | HAS_NULL | HAS_NONE }; -void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { +int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { + if (iVal < 0 || iVal >= pColData->nVal || + (pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl)/POINTER_BYTES)){ + return TSDB_CODE_INVALID_PARA; + } tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal); + return TSDB_CODE_SUCCESS; } uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) { @@ -3436,7 +3441,10 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S for (int32_t i = 0; i < nColData; i++) { SColVal cv = {0}; - tColDataGetValue(&aFromColData[i], iFromRow, &cv); + code = tColDataGetValue(&aFromColData[i], iFromRow, &cv); + if (code != TSDB_CODE_SUCCESS) { + return code; + } code = tColDataAppendValue(&aToColData[i], &cv); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3575,7 +3583,10 @@ static int32_t tColDataMerge(SArray **colArr) { SColData *dstCol = taosArrayGet(dst, j); SColVal cv; - tColDataGetValue(srcCol, i, &cv); + code = tColDataGetValue(srcCol, i, &cv); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } code = tColDataAppendValue(dstCol, &cv); if (code) { goto _exit; @@ -3588,7 +3599,10 @@ static int32_t tColDataMerge(SArray **colArr) { SColData *dstCol = taosArrayGet(dst, j); SColVal cv; - tColDataGetValue(srcCol, i, &cv); + code = tColDataGetValue(srcCol, i, &cv); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } code = tColDataUpdateValue(dstCol, &cv, true); if (code) { goto _exit; diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index ebf91025bb..10c1077697 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -449,6 +449,20 @@ static void checkTSRow(const char **data, STSRow *row, STSchema *pTSchema) { checkSColVal(data[i], &colVal, pCol->type); } } +#ifndef WINDOWS +TEST(testCase, tColDataGetValue) { + SColData pColData = {0}; + SColVal pColVal = {0}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); + + pColData = {.flag = 8}; + pColVal = {0}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); + + pColData = {.nVal = 1, .flag = 8}; + ASSERT_NE(tColDataGetValue(&pColData, 0, &pColVal),0); +} +#endif TEST(testCase, AllNormTest) { int16_t nCols = 14; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8e595f76c9..165437ed28 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -129,7 +129,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 30a20cd68d..8fe7d3823a 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -325,7 +325,6 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8 ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (ctx->suidInfo == NULL) { return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - ; } taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c0c4c4a5a3..a2b6194375 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -796,7 +796,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* sourceIdx++; } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { - tColDataGetValue(pCol, i, &colVal); + code = tColDataGetValue(pCol, i, &colVal); + TSDB_CHECK_CODE(code, line, END); code = doSetVal(pColData, i, &colVal); TSDB_CHECK_CODE(code, line, END); } @@ -937,7 +938,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData pCol = taosArrayGet(pCols, j); TQ_NULL_GO_TO_END(pCol); SColVal colVal = {0}; - tColDataGetValue(pCol, i, &colVal); + TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal)); PROCESS_VAL } @@ -961,7 +962,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); TQ_NULL_GO_TO_END(pColData); SColVal colVal = {0}; - tColDataGetValue(pCol, i, &colVal); + TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal)); SET_DATA } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 5151ea3958..4dff1d08d9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1715,7 +1715,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow); if (colType == 2) { SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); - tColDataGetValue(pColData, tRow.iRow, &colVal); + TAOS_CHECK_GOTO(tColDataGetValue(pColData, tRow.iRow, &colVal), &lino, _exit); SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a9f3893b96..4ca140c03e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -123,7 +123,8 @@ static int32_t tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int pColData = &pBlock->aColData[slotId]; - tColDataGetValue(pColData, irow, &cv); + code = tColDataGetValue(pColData, irow, &cv); + TSDB_CHECK_CODE(code, lino, _end); pKey->numOfPKs = 1; pKey->pks[0].type = cv.value.type; @@ -1603,7 +1604,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro TSDB_CHECK_CODE(code, lino, _end); } else { // varchar/nchar type for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { - tColDataGetValue(pData, j, &cv); + code = tColDataGetValue(pData, j, &cv); + TSDB_CHECK_CODE(code, lino, _end); code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); TSDB_CHECK_CODE(code, lino, _end); } @@ -5282,7 +5284,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]); if (pData->cid == pSupInfo->colId[i]) { - tColDataGetValue(pData, rowIndex, &cv); + code = tColDataGetValue(pData, rowIndex, &cv); + TSDB_CHECK_CODE(code, lino, _end); code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); TSDB_CHECK_CODE(code, lino, _end); j += 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 16f6777765..88c6ac3d00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -622,7 +622,9 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); if (pColData) { - tColDataGetValue(pColData, pRow->iRow, pColVal); + if (tColDataGetValue(pColData, pRow->iRow, pColVal) != 0){ + tsdbError("failed to tColDataGetValue"); + } } else { *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); } @@ -645,7 +647,9 @@ void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) { SColData *pColData = &pBlock->aColData[i]; if (pColData->cflag & COL_IS_KEY) { SColVal cv; - tColDataGetValue(pColData, irow, &cv); + if (tColDataGetValue(pColData, irow, &cv) != 0){ + break; + } key->pks[key->numOfPKs] = cv.value; key->numOfPKs++; } else { @@ -719,7 +723,9 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { } if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); + if (tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv) != 0){ + return NULL; + } ++pIter->iColData; return &pIter->cv; } else { @@ -1251,7 +1257,8 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type); if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit; } else { - tColDataGetValue(pColDataFrom, iRow, &cv); + code = tColDataGetValue(pColDataFrom, iRow, &cv); + if (code) goto _exit; if (flag) { code = tColDataUpdateValue(pColDataTo, &cv, flag > 0); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f52a0c3aba..723fd14145 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -243,7 +243,7 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { code = TSDB_CODE_VND_HASH_MISMATCH; goto _exit; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK); + metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; tstrncpy(cfgRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN); @@ -279,7 +279,8 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } } else { vError("vnodeGetTableCfg get invalid table type:%d", mer1.me.type); - return TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; + goto _exit; } cfgRsp.numOfTags = schemaTag.nCols; diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 61809b99e0..adf1b4f0d1 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -46,7 +46,7 @@ static FORCE_INLINE bool mJoinBlkReachThreshold(SMJoinOperatorInfo* pInfo, int64 return blkRows >= pInfo->ctx.mergeCtx.blkThreshold; } - return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit; + return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit || blkRows >= pInfo->ctx.mergeCtx.blkThreshold; } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index e2693ff4a4..c6c091c630 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -44,10 +44,10 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteFooter(TFileWriter* write); // handle file corrupt later -static int tfileReaderLoadHeader(TFileReader* reader); +static int tfileReaderLoadHeader(TFileReader* reader); static int32_t tfileReaderLoadFst(TFileReader* reader); -static int tfileReaderVerify(TFileReader* reader); -static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); +static int tfileReaderVerify(TFileReader* reader); +static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int32_t tfileGetFileList(const char* path, SArray** pResult); static int tfileRmExpireFile(SArray* result); @@ -106,6 +106,11 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) { SArray* files = NULL; int32_t code = tfileGetFileList(path, &files); + if (code != 0) { + indexError("failed to get file list since %s", tstrerror(code)); + goto End; + } + for (size_t i = 0; i < taosArrayGetSize(files); i++) { char* file = taosArrayGetP(files, i); @@ -1182,7 +1187,6 @@ _exception: TAOS_UNUSED(taosCloseDir(&pDir)); if (files != NULL) { taosArrayDestroyEx(files, tfileDestroyFileName); - taosArrayDestroy(files); } return code; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index e2069deefc..128fb50b8f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1841,6 +1841,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* } if (TK_NK_QUESTION == pToken->type) { + if (!pCxt->pComCxt->isStmtBind && i != 0) { + return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values"); + } pCxt->isStmtBind = true; pStmt->usingTableProcessing = true; if (pCols->pColIndex[i] == tbnameIdx) { @@ -1874,6 +1877,9 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound"); } } else { + if (pCxt->pComCxt->isStmtBind) { + return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values"); + } if (pCols->pColIndex[i] < numOfCols) { const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fa16cace25..42d7f44b62 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n int32_t vgId = pTask->pMeta->vgId; if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit", id, vgId, pTmrInfo->launchChkptId); @@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); return -1; } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; @@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + taosArrayDestroy(pTmp); return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, @@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); int32_t vgId = pTask->pMeta->vgId; - int32_t checkpointId = pActiveInfo->activeId; + int64_t checkpointId = pActiveInfo->activeId; const char* id = pTask->id.idStr; int32_t notRsp = 0; @@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* return code; } - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); if (code) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); return code; } - notRsp = taosArrayGetSize(pNotRspList); + notRsp = taosArrayGetSize(*pNotRspList); if (notRsp == 0) { streamClearChkptReadyMsg(pActiveInfo); } else { - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList); } return code; @@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); + code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { + streamCleanBeforeQuitTmr(pTmrInfo, param); + streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); return; @@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pList); if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, + stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; @@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 85f287f301..89cb4153fe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { code = doStreamExecTask(pTask); if (code) { - stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); - return code; + stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } // check if continue streamMutexLock(&pTask->lock); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbde104f4e..4862a4b963 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -422,13 +422,11 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { SyncIndex minMatchIndex = SYNC_INDEX_INVALID; - if (pSyncNode->peersNum > 0) { - minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0])); - } - - for (int32_t i = 1; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); - if (matchIndex < minMatchIndex) { + if (minMatchIndex == SYNC_INDEX_INVALID) { + minMatchIndex = matchIndex; + } else if (matchIndex > 0 && matchIndex < minMatchIndex) { minMatchIndex = matchIndex; } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 43a2ff6a23..4d47de98b4 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -208,28 +208,22 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; TAOS_UNUSED(taosThreadRwlockRdlock(&pWal->mutex)); - SWalFileInfo *gloablPRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - if (gloablPRet == NULL) { + SWalFileInfo *globalRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + if (globalRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } - SWalFileInfo *pRet = taosMemoryMalloc(sizeof(SWalFileInfo)); - if (pRet == NULL) { - wError("failed to allocate memory for localRet"); - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - TAOS_RETURN(terrno); - } - TAOS_MEMCPY(pRet, gloablPRet, sizeof(SWalFileInfo)); + SWalFileInfo ret; + TAOS_MEMCPY(&ret, globalRet, sizeof(SWalFileInfo)); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - if (pReader->curFileFirstVer != pRet->firstVer) { + if (pReader->curFileFirstVer != ret.firstVer) { // error code was set inner - TAOS_CHECK_RETURN_WITH_FREE(walReadChangeFile(pReader, pRet->firstVer), pRet); + TAOS_CHECK_RETURN(walReadChangeFile(pReader, ret.firstVer)); } // error code was set inner - TAOS_CHECK_RETURN_WITH_FREE(walReadSeekFilePos(pReader, pRet->firstVer, ver), pRet); - taosMemoryFree(pRet); + TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, ret.firstVer, ver)); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; @@ -437,15 +431,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { seeked = true; continue; } else { - wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", - pReader->pWal->cfg.vgId, ver, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); - if (contLen < 0) { - TAOS_RETURN(terrno); + code = terrno; } else { - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; } + wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, tstrerror(code)); + TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); + TAOS_RETURN(code); } } @@ -478,15 +472,15 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { } if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) { - wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", - pReader->pWal->cfg.vgId, ver, terrstr()); - TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); - if (contLen < 0) { - TAOS_RETURN(terrno); + code = terrno; } else { - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; } + wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", + pReader->pWal->cfg.vgId, ver, tstrerror(code)); + TAOS_UNUSED(taosThreadMutexUnlock(&pReader->mutex)); + TAOS_RETURN(code); } if (pReader->pHead->head.version != ver) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 855ac35c6c..feed297f71 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -909,12 +909,6 @@ const char* tstrerror(int32_t err) { (void)taosThreadOnce(&tsErrorInit, tsSortError); // this is a system errno - if ((err & 0x00ff0000) == 0x00ff0000) { - int32_t code = err & 0x0000ffff; - // strerror can handle any invalid code - // invalid code return Unknown error - return strerror(code); - } #ifdef WINDOWS if ((err & 0x01ff0000) == 0x01ff0000) { snprintf(WinAPIErrDesc, 256, "windows api error, code: 0x%08x", err & 0x0000ffff); @@ -924,6 +918,13 @@ const char* tstrerror(int32_t err) { return WinAPIErrDesc; } #endif + if ((err & 0x00ff0000) == 0x00ff0000) { + int32_t code = err & 0x0000ffff; + // strerror can handle any invalid code + // invalid code return Unknown error + return strerror(code); + } + int32_t s = 0; int32_t e = sizeof(errors) / sizeof(errors[0]); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6fb9950931..38dd080ef5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -249,6 +249,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_group_NotReturnValue.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/td-32548.py +,,n,system-test,python3 ./test.py -f 2-query/large_data.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stddev_test.py -Q 2 diff --git a/tests/pytest/util/cases.py b/tests/pytest/util/cases.py index 536b8f30d3..eee8809ad0 100644 --- a/tests/pytest/util/cases.py +++ b/tests/pytest/util/cases.py @@ -19,6 +19,7 @@ import inspect import importlib import traceback from util.log import * +import platform class TDCase: @@ -146,5 +147,42 @@ class TDCases: tdLog.notice("total %d Cluster test case(s) executed" % (runNum)) + def getTaosBenchmarkPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + if "community" in selfPath: + projPath = selfPath[: selfPath.find("community")] + else: + projPath = selfPath[: selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if (tool) in files: + rootRealPath = os.path.dirname(os.path.realpath(root)) + if "packaging" not in rootRealPath: + paths.append(os.path.join(root, tool)) + break + if len(paths) == 0: + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def taosBenchmarkExec(self, param): + buildPath = tdCases.getTaosBenchmarkPath() + + if (platform.system().lower() == 'windows'): + cmdStr1 = ' mintty -h never %s %s '%(buildPath, param) + tdLog.info(cmdStr1) + os.system(cmdStr1) + else: + cmdStr1 = '%s %s &'%(buildPath, param) + tdLog.info(cmdStr1) + os.system(cmdStr1) + + time.sleep(5) + tdCases = TDCases() diff --git a/tests/script/api/stmt2-performance.c b/tests/script/api/stmt2-performance.c index a539affaf1..7ab20d873f 100644 --- a/tests/script/api/stmt2-performance.c +++ b/tests/script/api/stmt2-performance.c @@ -5,8 +5,8 @@ #include #include "taos.h" -int CTB_NUMS = 2; -int ROW_NUMS = 2; +int CTB_NUMS = 1; +int ROW_NUMS = 1; int CYC_NUMS = 2; void do_query(TAOS* taos, const char* sql) { @@ -217,7 +217,7 @@ int main() { exit(1); } - do_stmt(taos, "insert into db.stb(tbname,ts,b,t1,t2) values(?,?,?,?,?)"); + do_stmt(taos, "insert into `db`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)"); // do_stmt(taos, "insert into db.? using db.stb tags(?,?)values(?,?)"); // do_taosc(taos); taos_close(taos); diff --git a/tests/system-test/1-insert/db_tb_name_check.py b/tests/system-test/1-insert/db_tb_name_check.py index e217bbe183..2f7afef6ec 100644 --- a/tests/system-test/1-insert/db_tb_name_check.py +++ b/tests/system-test/1-insert/db_tb_name_check.py @@ -78,9 +78,34 @@ class TDTestCase: tdSql.error(f'create table {sql} (ts timestamp,c0 int)') tdSql.execute(f'trim database `{dbname}`') tdSql.execute(f'drop database `{dbname}`') + + def tb_name_len_check(self): + dbname = tdCom.getLongName(10) + tdSql.execute(f'create database if not exists `{dbname}` vgroups 1 replica 1') + tdSql.execute(f'use `{dbname}`') + tdSql.execute(f'CREATE STABLE `test_csv` (`ts` TIMESTAMP, `c1` VARCHAR(2000), `c2` VARCHAR(2000)) TAGS (`c3` VARCHAR(2000))') + tbname = "test_csv_a12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012" + tdSql.execute(f"INSERT INTO `{tbname}`\ + using `test_csv` (`c3`) tags('a12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890')\ + (`ts`,`c1`,`c2`) values(1591060628000,'a','1');") + tdSql.query(f'select * from {tbname}') + tdSql.checkRows(1) + tdSql.execute(f'drop table {tbname}') + + tdSql.execute(f"INSERT INTO `{dbname}`.`{tbname}`\ + using `{dbname}`.`test_csv` (`c3`) tags('a12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890')\ + (`ts`,`c1`,`c2`) values(1591060628000,'a','1');") + tdSql.query(f'select * from {tbname}') + tdSql.checkRows(1) + tdSql.execute(f'drop table {tbname}') + + tdSql.execute(f'trim database `{dbname}`') + tdSql.execute(f'drop database `{dbname}`') + def run(self): self.db_name_check() self.tb_name_check() + self.tb_name_len_check() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/large_data.py b/tests/system-test/2-query/large_data.py new file mode 100644 index 0000000000..2279c3ce70 --- /dev/null +++ b/tests/system-test/2-query/large_data.py @@ -0,0 +1,56 @@ +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def prepare_data(self): + tdSql.execute("drop database if exists test;") + + tdCases.taosBenchmarkExec("-t 2 -n 1000000 -b int,float,nchar -y") + + while True: + tdSql.query("select ts from test.d0;") + num1 = tdSql.queryRows + tdSql.query("select ts from test.d1;") + num2 = tdSql.queryRows + if num1 == 1000000 and num2 == 1000000: + break + tdLog.info(f"waiting for data ready, d0: {num1}, d1: {num2}") + time.sleep(1) + + def ts5803(self): + tdSql.query("select d0.ts,d0.c1,d0.c2 from test.d0 join test.d1 on d0.ts=d1.ts;") + num1 = tdSql.queryRows + + tdSql.query("select d0.ts,d0.c1,d0.c2 from test.d0 join test.d1 on d0.ts=d1.ts limit 1000000;") + tdSql.checkRows(num1) + + tdSql.query("select d0.ts from test.d0 join test.d1 on d0.ts=d1.ts limit 1000000;") + tdSql.checkRows(num1) + + tdSql.query("select d0.ts,d0.c1,d0.c2 from test.d0 left join test.d1 on d0.ts=d1.ts;") + num1 = tdSql.queryRows + + tdSql.query("select d0.ts,d0.c1,d0.c2 from test.d0 left join test.d1 on d0.ts=d1.ts limit 1000000;") + tdSql.checkRows(num1) + + tdSql.query("select d0.ts from test.d0 left join test.d1 on d0.ts=d1.ts limit 1000000;") + tdSql.checkRows(num1) + + def run(self): + self.prepare_data() + self.ts5803() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())