diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7f9b240f63..900bc88c41 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -25,7 +25,7 @@ extern "C" { #include "tlrucache.h" #include "tmsgcb.h" -#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_RESP_TTL_MS 30000 #define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SLOW_DOWN_RANGE 100 diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d761813db1..87f753e6aa 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef bool (*RpcFFfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -90,6 +91,9 @@ typedef struct SRpcInit { int32_t retryMaxInterval; // retry max interval int64_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not @@ -107,6 +111,8 @@ typedef struct SRpcInit { // destroy client ahandle; RpcDfp dfp; + // fail fast fp + RpcFFfp ffp; void *parent; } SRpcInit; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 93398d337d..7564d91330 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -231,10 +231,9 @@ void destroyTscObj(void *pObj) { tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); - int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - if (0 == connNum) { - destroyAppInst(pTscObj->pAppInfo); - } + // In any cases, we should not free app inst here. Or an race condition rises. + /*int64_t connNum = */atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3bcfddb8b2..d1a3f38143 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -407,7 +407,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = tsNumOfCores * 2; tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); - if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, 0) != 0) return -1; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 12aba130d5..2383a09343 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { return (*msgFp)(pWrapper->pMgmt, pMsg); } +static bool dmFailFastFp(tmsg_t msgType) { + // add more msg type later + return msgType == TDMT_SYNC_HEARTBEAT || msgType == TDMT_SYNC_APPEND_ENTRIES; +} + static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; @@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index df25e44bee..0538d70101 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -383,9 +383,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pGid->syncCanRead != pVload->syncCanRead) { mInfo( "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d " - "canRead:%d", + "canRead:%d, dnode:%d", pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, - syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead); + syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id); pGid->syncState = pVload->syncState; pGid->syncRestore = pVload->syncRestore; pGid->syncCanRead = pVload->syncCanRead; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index b4ea8ee125..a6e52caf1f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1126,34 +1126,61 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, } if (!force) { - mInfo("vgId:%d, will add 1 vnode", pVgroup->vgId); - if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; - for (int32_t i = 0; i < newVg.replica - 1; ++i) { - if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; - } - if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1; - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; - - mInfo("vgId:%d, will remove 1 vnode", pVgroup->vgId); - newVg.replica--; - SVnodeGid del = newVg.vnodeGid[vnIndex]; - newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; - memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); - { - SSdbRaw *pRaw = mndVgroupActionEncode(&newVg); - if (pRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRaw) != 0) { - sdbFreeRaw(pRaw); - return -1; + if (newVg.replica == 1) { + mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId); + if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; + for (int32_t i = 0; i < newVg.replica - 1; ++i) { + if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; } - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - } + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; - if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; - for (int32_t i = 0; i < newVg.replica; ++i) { - if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; + mInfo("vgId:%d, will remove 1 vnode, replca:2", pVgroup->vgId); + newVg.replica--; + SVnodeGid del = newVg.vnodeGid[vnIndex]; + newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; + memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); + { + SSdbRaw *pRaw = mndVgroupActionEncode(&newVg); + if (pRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRaw) != 0) { + sdbFreeRaw(pRaw); + return -1; + } + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + } + + if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; + for (int32_t i = 0; i < newVg.replica; ++i) { + if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; + } + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; + } else { // new replica == 3 + mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId); + if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; + mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId); + newVg.replica--; + SVnodeGid del = newVg.vnodeGid[vnIndex]; + newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; + memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); + { + SSdbRaw *pRaw = mndVgroupActionEncode(&newVg); + if (pRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRaw) != 0) { + sdbFreeRaw(pRaw); + return -1; + } + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + } + + if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; + for (int32_t i = 0; i < newVg.replica; ++i) { + if (i == vnIndex) continue; + if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; + } + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1; + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; } - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; } else { mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5dd9293118..a4581f5472 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -214,6 +214,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); +static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -2477,8 +2478,39 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { - setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); - break; + + int32_t nextIndex = -1; + SBlockIndex bIndex = {0}; + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); + if (!hasNeighbor) { // do nothing + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } + + if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + + // 1. find the next neighbor block in the scan block list + SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex}; + int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb); + + // 2. remove it from the scan block list + setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); + + // 3. load the neighbor block, and set it to be the currently accessed file data block + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); + if (code != TSDB_CODE_SUCCESS) { + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } + + // 4. check the data values + initBlockDumpInfo(pReader, pBlockIter); + } else { + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } } } } @@ -2888,7 +2920,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } // set the correct start position in case of the first/last file block, according to the query time window -static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { +void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { SDataBlk* pBlock = getCurrentBlock(pBlockIter); SReaderStatus* pStatus = &pReader->status; @@ -4055,6 +4087,31 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64 } } + +static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, + SColumnDataAgg* pTsAgg) { + // do fill all null column value SMA info + int32_t i = 0, j = 0; + int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg); + taosArrayInsert(pSup->pColAgg, 0, pTsAgg); + + while (j < numOfCols && i < size) { + SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); + if (pAgg->colId == pSup->colId[j]) { + i += 1; + j += 1; + } else if (pAgg->colId < pSup->colId[j]) { + i += 1; + } else if (pSup->colId[j] < pAgg->colId) { + if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; + taosArrayInsert(pSup->pColAgg, i ,&nullColAgg); + } + j += 1; + } + } +} + int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) { int32_t code = 0; *allHave = false; @@ -4110,6 +4167,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); } + // do fill all null column value SMA info + doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg); + + i = 0, j = 0; while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); if (pAgg->colId == pSup->colId[j]) { @@ -4119,15 +4180,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - if (pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID) { - pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; - } else { - // all date in this block are null - SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = pBlock->nRow}; - taosArrayPush(pSup->pColAgg, &nullColAgg); - - pResBlock->pBlockAgg[pSup->slotId[j]] = taosArrayGetLast(pSup->pColAgg); - } + ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); + pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; j += 1; } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 2032cb9fce..fc3cfbd0f6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -439,7 +439,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* goto end; } } - removeInvalidTable(uidList, tags); + if (suid != 0) { + removeInvalidTable(uidList, tags); + } int32_t rows = taosArrayGetSize(uidList); if (rows == 0) { @@ -1604,7 +1606,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); - pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t)*pCond->numOfCols); + pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols); if (pCond->colList == NULL || pCond->pSlotList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pCond->colList); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bd22e864cd..043cc396b5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1580,6 +1580,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { if (pOperator->blocking) { ASSERT(0); + return 0; } else { return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 04f5f4ecfe..0f35d3778d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1218,27 +1218,56 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS if (rows == 0) { return TSDB_CODE_SUCCESS; } + + SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; + TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; + int64_t version = pSrcBlock->info.version - 1; + + if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) { + uint64_t srcUid = srcUidData[0]; + TSKEY startTs = srcStartTsCol[0]; + TSKEY endTs = srcEndTsCol[0]; + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version); + printDataBlock(pPreRes, "pre res"); + blockDataCleanup(pSrcBlock); + int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex); + rows = pPreRes->info.rows; + + for (int32_t i = 0; i < rows; i++) { + uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i); + appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, + &groupId, NULL); + } + printDataBlock(pSrcBlock, "new delete"); + } + uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; + srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; + srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; + srcUidData = (uint64_t*)pSrcUidCol->pData; + int32_t code = blockDataEnsureCapacity(pDestBlock, rows); if (code != TSDB_CODE_SUCCESS) { return code; } - SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); - SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; - SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; - ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; - TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - int64_t version = pSrcBlock->info.version - 1; for (int32_t i = 0; i < rows;) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; @@ -1653,13 +1682,6 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]); colDataAppend(pGpCol, i, (const char*)&groupId, false); } - } else { - // SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion); - // if (!pPreRes || pPreRes->info.rows == 0) { - // return 0; - // } - // ASSERT(pPreRes->info.rows == 1); - // return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0); } } @@ -3032,8 +3054,10 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* if (pSupp->dbNameSlotId != -1) { ASSERT(strlen(dbName)); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); - char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - strncpy(varDataVal(varDbName), dbName, strlen(dbName)); + + char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN); + varDataSetLen(varDbName, strlen(dbName)); colDataAppend(colInfoData, 0, varDbName, false); } @@ -3042,7 +3066,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId); if (strlen(stbName) != 0) { char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - strncpy(varDataVal(varStbName), stbName, strlen(stbName)); + strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN); varDataSetLen(varStbName, strlen(stbName)); colDataAppend(colInfoData, 0, varStbName, false); } else { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a9145ac69d..111da6d6ba 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -504,7 +504,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) { */ SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) { + if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows && !IS_VAR_DATA_TYPE(pInputCol->info.type)) { numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; ASSERT(numOfElem >= 0); } else { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 39118910f1..48df7e36a3 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -377,7 +377,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \ - (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)))) + (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect)) #define SCH_REDIRECT_MSGTYPE(_msgType) \ ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \ (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 560ce0bdc3..c23b461b47 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -156,6 +156,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } + pTask->redirectCtx.inRedirect = false; + switch (msgType) { case TDMT_VND_COMMIT_RSP: { SCH_ERR_JRET(rspCode); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index cd69834924..8e60222ca6 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -362,17 +362,12 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, } pCtx->totalTimes++; + pCtx->roundTimes++; if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) { pCtx->roundTotal = pEpSet->numOfEps; - pCtx->roundTimes = 0; - - pTask->delayExecMs = 0; - - goto _return; } - pCtx->roundTimes++; if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index cf63df371d..3bd94dbab5 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -46,6 +46,7 @@ typedef struct SyncClientRequest { uint32_t originalRpcType; // origin RpcMsg msgType uint64_t seqNum; bool isWeak; + int16_t reserved; uint32_t dataLen; // origin RpcMsg.contLen char data[]; // origin RpcMsg.pCont } SyncClientRequest; @@ -56,6 +57,7 @@ typedef struct SyncClientRequestReply { uint32_t msgType; int32_t errCode; SRaftId leaderHint; + int16_t reserved; } SyncClientRequestReply; typedef struct SyncRequestVote { @@ -68,6 +70,7 @@ typedef struct SyncRequestVote { SyncTerm term; SyncIndex lastLogIndex; SyncTerm lastLogTerm; + int16_t reserved; } SyncRequestVote; typedef struct SyncRequestVoteReply { @@ -79,6 +82,7 @@ typedef struct SyncRequestVoteReply { // private data SyncTerm term; bool voteGranted; + int16_t reserved; } SyncRequestVoteReply; typedef struct SyncAppendEntries { @@ -94,6 +98,7 @@ typedef struct SyncAppendEntries { SyncTerm prevLogTerm; SyncIndex commitIndex; SyncTerm privateTerm; + int16_t reserved; uint32_t dataLen; char data[]; } SyncAppendEntries; @@ -111,6 +116,7 @@ typedef struct SyncAppendEntriesReply { SyncIndex matchIndex; SyncIndex lastSendIndex; int64_t startTime; + int16_t reserved; } SyncAppendEntriesReply; typedef struct SyncHeartbeat { @@ -126,6 +132,7 @@ typedef struct SyncHeartbeat { SyncTerm privateTerm; SyncTerm minMatchIndex; int64_t timeStamp; + int16_t reserved; } SyncHeartbeat; typedef struct SyncHeartbeatReply { @@ -140,6 +147,7 @@ typedef struct SyncHeartbeatReply { SyncTerm privateTerm; int64_t startTime; int64_t timeStamp; + int16_t reserved; } SyncHeartbeatReply; typedef struct SyncPreSnapshot { @@ -151,6 +159,7 @@ typedef struct SyncPreSnapshot { // private data SyncTerm term; + int16_t reserved; } SyncPreSnapshot; typedef struct SyncPreSnapshotReply { @@ -163,6 +172,7 @@ typedef struct SyncPreSnapshotReply { // private data SyncTerm term; SyncIndex snapStart; + int16_t reserved; } SyncPreSnapshotReply; typedef struct SyncApplyMsg { @@ -190,6 +200,7 @@ typedef struct SyncSnapshotSend { SSyncCfg lastConfig; int64_t startTime; int32_t seq; + int16_t reserved; uint32_t dataLen; char data[]; } SyncSnapshotSend; @@ -208,6 +219,7 @@ typedef struct SyncSnapshotRsp { int32_t ack; int32_t code; SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid + int16_t reserved; } SyncSnapshotRsp; typedef struct SyncLeaderTransfer { diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index cf933d98b2..9c39fc1e8e 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -19,6 +19,7 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncReplication.h" +#include "syncRespMgr.h" #include "syncUtil.h" static void syncNodeCleanConfigIndex(SSyncNode* ths) { @@ -85,11 +86,9 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { } } -#if 0 if (!syncNodeIsMnode(ths)) { syncRespClean(ths->pSyncRespMgr); } -#endif return 0; } diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 833937aa41..57aba67b1d 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,10 +57,14 @@ typedef struct { int32_t retryMaxInterval; // retry max interval int32_t retryMaxTimouet; + int32_t failFastThreshold; + int32_t failFastInterval; + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); + bool (*failFastFp)(tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c6a5cfdc95..0eac12f7c5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->failFastThreshold = pInit->failFastThreshold; + pRpc->failFastInterval = pInit->failFastInterval; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; + pRpc->failFastFp = pInit->ffp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7339d487d1..50594f1a5a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,6 +84,8 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; + SHashObj* failFastCache; + SCliMsg* stopMsg; bool quit; @@ -96,6 +98,13 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; +typedef struct { + int32_t reinit; + int64_t timestamp; + int32_t count; + int32_t threshold; + int64_t interval; +} SFailFastItem; // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) { int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { - tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), uv_err_name(status)); cliHandleExcept(pConn); } @@ -863,7 +872,6 @@ _RETURN: } void cliConnCb(uv_connect_t* req, int status) { - // impl later SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -875,7 +883,33 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { - tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); + STrans* pTransInst = pThrd->pTransInst; + + tError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn), + pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status)); + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && + (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + char* ip = pConn->ip; + uint32_t port = pConn->port; + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + int64_t cTimestamp = taosGetTimestampMs(); + if (item != NULL) { + int32_t elapse = cTimestamp - item->timestamp; + if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { + item->count++; + } else { + item->count = 1; + item->timestamp = cTimestamp; + } + } else { + SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; + taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem)); + } + } cliHandleExcept(pConn); return; } @@ -1027,6 +1061,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + + SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); + if (item != NULL) { + int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); + if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { + STraceId* trace = &(pMsg->msg.info.traceId); + tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, + TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); + destroyCmsg(pMsg); + return; + } + } + } + bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { @@ -1299,6 +1352,8 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->quit = false; return pThrd; } @@ -1325,6 +1380,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); taosMemoryFree(pThrd); } diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 68dfba14e9..cd4324a592 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { offsetInitFinished = true; } else { while (!offsetInitFinished) - ; // Ensure initialization is completed. + ; // Ensure initialization is completed. } GetSystemTimeAsFileTime(&f); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index fc9d90c985..be1db74f1a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons if (!osLogSpaceAvailable()) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; - char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); int32_t len = taosBuildLogHead(buffer, flags); va_list argpointer; diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 74aa7baf0b..60df188d7b 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -79,6 +79,9 @@ int64_t bpTs; #define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t))) typedef struct { + bool singleTbInsert; + int32_t singleTbIdx; + int64_t* tsData; bool* boolData; int8_t* tinyData; @@ -116,6 +119,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos); int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos); int insertAUTOTest1(TAOS_STMT *stmt, TAOS *taos); int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos); +int insertAUTOTest3(TAOS_STMT *stmt, TAOS *taos); int queryColumnTest(TAOS_STMT *stmt, TAOS *taos); int queryMiscTest(TAOS_STMT *stmt, TAOS *taos); @@ -130,6 +134,7 @@ typedef struct { int32_t *colList; // full table column list int32_t testType; int32_t autoCreateTbl; + bool duplicateValue; bool fullCol; int32_t (*runFn)(TAOS_STMT*, TAOS*); int32_t tblNum; @@ -143,46 +148,47 @@ typedef struct { } CaseCfg; CaseCfg gCase[] = { - {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, true, insertMBSETest1, 1, 10, 10, 0, 0, 0, 1, -1}, - {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, true, insertMBSETest1, 10, 100, 10, 0, 0, 0, 1, -1}, + {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 1, 10, 10, 0, 0, 0, 1, -1}, + {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 100, 10, 0, 0, 0, 1, -1}, - {"insert:MBSE1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBSETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBSE1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBSETest1, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBSE1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBSETest1, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBSE1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBSE1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBSE1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 2, 0, 0, 1, -1}, - {"insert:MBSE2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBSETest2, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBSE2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBSETest2, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBSE2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBSETest2, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBSE2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest2, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBSE2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBSE2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 2, 0, 0, 1, -1}, - {"insert:MBME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest1, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBME1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 2, 0, 0, 1, -1}, // 11 - {"insert:MBME2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBMETest2, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest2, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest2, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest2, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBME2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBME2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 2, 0, 0, 1, -1}, - {"insert:MBME3-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBMETest3, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME3-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest3, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME3-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest3, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME3-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest3, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBME3-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBME3-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 2, 0, 0, 1, -1}, - {"insert:MBME4-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMBMETest4, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME4-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest4, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME4-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMBMETest4, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME4-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest4, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MBME4-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MBME4-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 2, 0, 0, 1, -1}, - {"insert:MPME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, insertMPMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MPME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, insertMPMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MPME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMPMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:MPME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMPMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, // 22 - {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 1, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:AUTO1-TBEXISTS", tListLen(fullColList), fullColList, TTYPE_INSERT, 3, true, insertAUTOTest2, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 1, false, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:AUTO2-TBEXISTS", tListLen(fullColList), fullColList, TTYPE_INSERT, 3, false, true, insertAUTOTest2, 10, 10, 2, 0, 0, 0, 1, -1}, +// {"insert:AUTO3-NTB", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, true, insertAUTOTest3, 10, 10, 2, 0, 0, 0, 1, -1}, - {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, - {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, -// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, -// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, }; @@ -232,7 +238,7 @@ CaseCtrl gCaseCtrl = { .numericParam = false, .rowNum = 0, .bindColNum = 0, - .bindTagNum = 14, + .bindTagNum = 0, .bindRowNum = 0, .bindColTypeNum = 0, .bindColTypeList = NULL, @@ -244,7 +250,7 @@ CaseCtrl gCaseCtrl = { .funcIdxList = NULL, .checkParamNum = false, .runTimes = 0, - .caseIdx = 23, + .caseIdx = 24, .caseNum = 1, .caseRunIdx = -1, .caseRunNum = -1, @@ -382,7 +388,11 @@ bool colExists(TAOS_MULTI_BIND* pBind, int32_t dataType) { void generateInsertSQL(BindData *data) { int32_t len = 0; if (gCurCase->tblNum > 1) { - len = sprintf(data->sql, "insert into ? "); + if (data->singleTbInsert) { + len = sprintf(data->sql, "insert into %s%d ", bpTbPrefix, data->singleTbIdx); + } else { + len = sprintf(data->sql, "insert into ? "); + } } else { len = sprintf(data->sql, "insert into %s0 ", bpTbPrefix); } @@ -938,7 +948,14 @@ int32_t prepareInsertData(BindData *data) { } for (int32_t i = 0; i < allRowNum; ++i) { - data->tsData[i] = bpTs++; + if (gCurCase->duplicateValue) { + data->tsData[i] = bpTs; + if (i % 2 == 1) { + bpTs++; + } + } else { + data->tsData[i] = bpTs++; + } data->boolData[i] = (bool)(i % 2); data->tinyData[i] = (int8_t)i; data->utinyData[i] = (uint8_t)(i+1); @@ -1251,6 +1268,9 @@ void bpCheckParamNum(TAOS_STMT *stmt) { void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) { int32_t rows = taos_stmt_affected_rows(stmt); int32_t insertNum = gCurCase->rowNum * gCurCase->tblNum * times; + if (gCurCase->duplicateValue) { + insertNum /= 2; + } if (insertNum != rows) { printf("!!!affected rows %d mis-match with insert num %d\n", rows, insertNum); exit(1); @@ -2014,6 +2034,65 @@ int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos) { return 0; } +/* normal table [prepare [bind add exec]] */ +int insertAUTOTest3(TAOS_STMT *stmt, TAOS *taos) { + int32_t loop = 0; + + while (gCurCase->bindColNum > 0) { + BindData data = {0}; + data.singleTbInsert = true; + prepareInsertData(&data); + + int32_t bindTimes = gCurCase->rowNum/gCurCase->bindRowNum; + for (int32_t t = 0; t< gCurCase->tblNum; ++t) { + data.singleTbIdx = t; + generateInsertSQL(&data); + + int code = taos_stmt_prepare(stmt, data.sql, 0); + if (code != 0){ + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + + for (int32_t b = 0; b bindColNum + b*gCurCase->bindColNum)) { + exit(1); + } + + if (taos_stmt_add_batch(stmt)) { + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + + if (taos_stmt_execute(stmt) != 0) { + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + } + } + + bpCheckIsInsert(stmt, 1); + + destroyData(&data); + + gCurCase->bindColNum -= 2; + gCurCase->fullCol = false; + loop++; + } + + bpCheckAffectedRows(stmt, loop); + + gExecLoopTimes = loop; + + return 0; +} + /* select * from table */ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) { @@ -2160,7 +2239,7 @@ void prepareCheckResult(TAOS *taos, bool silent) { sprintf(buf, "%s%d", bpTbPrefix, 0); } - prepareCheckResultImpl(taos, buf, gCaseCtrl.printRes, gCurCase->rowNum * gExecLoopTimes, silent); + prepareCheckResultImpl(taos, buf, gCaseCtrl.printRes, gCurCase->duplicateValue ? (gCurCase->rowNum * gExecLoopTimes / 2) : (gCurCase->rowNum * gExecLoopTimes), silent); } gExecLoopTimes = 1; @@ -2749,6 +2828,7 @@ void runAll(TAOS *taos) { printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); +#if 0 strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.precision = TIME_PRECISION_MICRO; @@ -2805,6 +2885,8 @@ void runAll(TAOS *taos) { runCaseList(taos); gCaseCtrl.bindColNum = 0; +#endif + /* strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); diff --git a/tests/script/tsim/vnode/replica3_repeat.sim b/tests/script/tsim/vnode/replica3_repeat.sim index 8efba515ae..10e18d01d0 100644 --- a/tests/script/tsim/vnode/replica3_repeat.sim +++ b/tests/script/tsim/vnode/replica3_repeat.sim @@ -85,6 +85,7 @@ print ======== step3 system sh/exec.sh -n dnode2 -s stop sleep 3000 +$t = 0 $x = 0 loop: @@ -126,8 +127,8 @@ print ======== step8 $lastRows = $data00 print ======== loop Times $x -if $x < 2 then - $x = $x + 1 +if $t < 2 then + $t = $t + 1 goto loop endi @@ -138,4 +139,4 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT system sh/exec.sh -n dnode5 -s stop -x SIGINT system sh/exec.sh -n dnode6 -s stop -x SIGINT system sh/exec.sh -n dnode7 -s stop -x SIGINT -system sh/exec.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode8 -s stop -x SIGINT