From 764365047d07bc83ffff50c10bac1ad179665fd1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 09:28:45 +0800 Subject: [PATCH 01/22] fix: stream scan core due to table end index introduced in 1 null row for empty group --- source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/executor.c | 10 ++++++---- source/libs/executor/src/scanoperator.c | 19 ++++++++++++------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c3f47cde9d..2ea23e73f2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -273,6 +273,8 @@ typedef struct STableScanInfo { int32_t tableStartIndex; // current group scan start int32_t tableEndIndex; // current group scan end int32_t currentGroupIndex; // current group index of groupOffset + int32_t currentGroupId; + int32_t currentTable; int8_t scanMode; int8_t assignBlockUid; uint8_t countState; // empty table count state diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index eb84cb0639..1bccc514e4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1264,6 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; + pScanInfo->currentTable = 0; pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); @@ -1278,16 +1279,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - // we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start // position, let's find it from the beginning. index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { + pScanInfo->currentTable = index; pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, - numOfTables, pScanInfo->tableEndIndex, id); + numOfTables, pScanInfo->currentTable, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; } @@ -1310,12 +1312,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1); pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..6736d6e137 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -939,7 +939,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 >= numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); if (pOperator->dynamicTask) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); @@ -978,9 +978,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - if (pInfo->tableEndIndex == -1) { + if (pInfo->currentGroupId == -1) { int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 == numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } @@ -1034,7 +1034,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } if (pOperator->status == OP_EXEC_DONE) { - pInfo->tableEndIndex = -1; + pInfo->currentGroupId = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; while (true) { @@ -1059,23 +1059,24 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } // if no data, switch to next table and continue scan + pInfo->currentTable++; pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex >= numOfTables) { + if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); taosRUnLockLatch(&pTaskInfo->lock); return NULL; } - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex); + tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, - pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo)); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1167,6 +1168,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, if (code != TSDB_CODE_SUCCESS) { goto _error; } + + pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1; pInfo->currentGroupIndex = -1; @@ -1264,6 +1267,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6 pTableScanInfo->base.cond.startVersion = 0; pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->scanTimes = 0; + pTableScanInfo->currentGroupId = -1; pTableScanInfo->tableEndIndex = -1; pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; @@ -2167,6 +2171,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->pTableScanOp->status = OP_OPENED; pTSInfo->scanTimes = 0; + pTSInfo->currentGroupId = -1; pTSInfo->tableEndIndex = -1; } From 9a7ef7fa182f3a350060bd55ba63743ae9e7bf15 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 15:08:50 +0800 Subject: [PATCH 02/22] fix: reuse tableListGetGroupList --- source/libs/executor/src/executil.c | 4 --- source/libs/executor/src/scanoperator.c | 39 ++++++++++++------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1da7818c3c..19f33d2420 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -646,10 +646,6 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf } } - if (initRemainGroups) { - pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups); - } - if (tsTagFilterCache) { tableList = taosArrayDup(pTableListInfo->pTableList, NULL); pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6736d6e137..36a7f4c183 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -657,33 +657,17 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) { - pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); - STableListInfo* pTableListInfo = pInfo->base.pTableListInfo; - int32_t numOfTables = tableListGetSize(pTableListInfo); - STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableListInfo, pInfo->tableStartIndex); + pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - if (pTableListInfo->oneTableForEachGroup) { - pInfo->tableEndIndex = pInfo->tableStartIndex; - } else if (pTableListInfo->groupOffset) { - pInfo->currentGroupIndex++; - if (pInfo->currentGroupIndex + 1 < pTableListInfo->numOfOuputGroups) { - pInfo->tableEndIndex = pTableListInfo->groupOffset[pInfo->currentGroupIndex + 1] - 1; - } else { - pInfo->tableEndIndex = numOfTables - 1; - } - } else { - pInfo->tableEndIndex = numOfTables - 1; - } + pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); if (!pInfo->needCountEmptyTable) { pInfo->countState = TABLE_COUNT_STATE_END; } else { pInfo->countState = TABLE_COUNT_STATE_SCAN; } - - *pKeyInfo = pStart; - *size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; } void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { @@ -984,7 +968,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - + + initNextGroupScan(pInfo, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); @@ -1006,16 +991,28 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; } + uInfo("slzhou 1 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); return result; } while (true) { result = startNextGroupScan(pOperator); if (result || pOperator->status == OP_EXEC_DONE) { + if (result) { + uInfo("slzhou 2 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); + } + else { + uInfo("slzhou 2 null block" ); + } return result; } } - + if (result) { + uInfo("slzhou 3 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); + } + else { + uInfo("slzhou 3 null block" ); + } return result; } From de6b559ab9bd1302bd4fb610868abd0cff524edc Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 15:21:20 +0800 Subject: [PATCH 03/22] fix: remove some uage of table end index --- source/libs/executor/inc/executorInt.h | 1 - source/libs/executor/src/executor.c | 2 -- source/libs/executor/src/scanoperator.c | 3 --- 3 files changed, 6 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2ea23e73f2..007eab40f8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -272,7 +272,6 @@ typedef struct STableScanInfo { SSampleExecInfo sample; // sample execution info int32_t tableStartIndex; // current group scan start int32_t tableEndIndex; // current group scan end - int32_t currentGroupIndex; // current group index of groupOffset int32_t currentGroupId; int32_t currentTable; int8_t scanMode; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1bccc514e4..e872c87237 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1265,7 +1265,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT uid = pTableInfo->uid; ts = INT64_MIN; pScanInfo->currentTable = 0; - pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); @@ -1286,7 +1285,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (index >= 0) { pScanInfo->currentTable = index; - pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, numOfTables, pScanInfo->currentTable, id); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 36a7f4c183..103166e3e7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1057,7 +1057,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; - pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -1169,7 +1168,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1; - pInfo->currentGroupIndex = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false; @@ -2169,7 +2167,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pTSInfo->tableEndIndex = -1; } if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { From 449c1631ef04224b67193a6069044d35f63f3317 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 17 Jan 2024 13:34:37 +0800 Subject: [PATCH 04/22] fix: add numOfOutputGroups back when group order scan --- source/libs/executor/src/executil.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 19f33d2420..fd34a98eca 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2155,6 +2155,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* return code; } + if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList); + if (groupSort || pScanNode->groupOrderScan) { code = sortTableGroup(pTableListInfo); } From eb2899d44b287a0a6e1b6647bf385baa37a10859 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 17 Jan 2024 09:50:57 +0000 Subject: [PATCH 05/22] not check mem on arm --- tests/parallel_test/cases.task | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1c2f5ec23c..63ca624d62 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -3,7 +3,13 @@ #NA,NA,y or n,script,./test.sh -f tsim/user/basic.sim #unit-test + +archOs=$(arch) +if [[ $archOs =~ "aarch64" ]]; then +,,n,unit-test,bash test.sh +else ,,y,unit-test,bash test.sh +fi # # army-test From f1fb7f5c147d112e20f84a07ca4f40d99e25cd7e Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 Jan 2024 08:43:36 +0800 Subject: [PATCH 06/22] fix: change num of output groups when cout empty group is needed --- source/libs/executor/src/executil.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fd34a98eca..033cda43f2 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2138,6 +2138,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && pScanNode->groupOrderScan){ pTableListInfo->numOfOuputGroups = numOfTables; + } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { + pTableListInfo->numOfOuputGroups = numOfTables; } else { pTableListInfo->numOfOuputGroups = 1; } From a1f7169b639af0f8852d4d953aa15895903a40ae Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 Jan 2024 09:52:04 +0800 Subject: [PATCH 07/22] fix: remove uInfo --- source/libs/executor/src/scanoperator.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 103166e3e7..09f4f6ac3c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -991,28 +991,16 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; } - uInfo("slzhou 1 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); return result; } while (true) { result = startNextGroupScan(pOperator); if (result || pOperator->status == OP_EXEC_DONE) { - if (result) { - uInfo("slzhou 2 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); - } - else { - uInfo("slzhou 2 null block" ); - } return result; } } - if (result) { - uInfo("slzhou 3 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); - } - else { - uInfo("slzhou 3 null block" ); - } + return result; } From 94ab74cc7f3d1738381aecc125347439744b65e6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 17 Jan 2024 14:47:54 +0800 Subject: [PATCH 08/22] clone task id --- source/libs/executor/src/exchangeoperator.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2797cb2d82..140b390913 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -37,7 +37,7 @@ typedef struct SSourceDataInfo { int64_t startTime; int32_t code; EX_SOURCE_STATUS status; - const char* taskId; + char* taskId; SArray* pSrcUidList; int32_t srcOpType; bool tableSeq; @@ -260,14 +260,16 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const return TSDB_CODE_SUCCESS; } + int32_t len = strlen(id); for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.taskId = id; + dataInfo.taskId = taosMemoryCalloc(1, len); + memcpy(dataInfo.taskId, id, len); dataInfo.index = i; SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); if (pDs == NULL) { - taosArrayDestroy(pInfo->pSourceDataInfo); + taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo); return TSDB_CODE_OUT_OF_MEMORY; } } @@ -368,6 +370,7 @@ void freeBlock(void* pParam) { void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; taosMemoryFreeClear(pInfo->pRsp); + taosMemoryFreeClear(pInfo->taskId); } void doDestroyExchangeOperatorInfo(void* param) { @@ -782,7 +785,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic if (pIdx->inUseIdx < 0) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); + char* pTaskId = GET_TASKID(pOperator->pTaskInfo); + int32_t len = strlen(pTaskId); + dataInfo.taskId = taosMemoryCalloc(1, len); + memcpy(dataInfo.taskId, pTaskId, len); dataInfo.index = pIdx->srcIdx; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.srcOpType = pBasicParam->srcOpType; From 1309e56a95eb0a38cd69254276da608c89d7cfe1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 17 Jan 2024 16:52:47 +0800 Subject: [PATCH 09/22] use strncpy --- source/libs/executor/src/exchangeoperator.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 140b390913..ac103ebbc1 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -260,12 +260,12 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const return TSDB_CODE_SUCCESS; } - int32_t len = strlen(id); + int32_t len = strlen(id) + 1; for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.taskId = taosMemoryCalloc(1, len); - memcpy(dataInfo.taskId, id, len); + strncpy(dataInfo.taskId, id, len); dataInfo.index = i; SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); if (pDs == NULL) { @@ -786,9 +786,9 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; char* pTaskId = GET_TASKID(pOperator->pTaskInfo); - int32_t len = strlen(pTaskId); + int32_t len = strlen(pTaskId) + 1; dataInfo.taskId = taosMemoryCalloc(1, len); - memcpy(dataInfo.taskId, pTaskId, len); + strncpy(dataInfo.taskId, pTaskId, len); dataInfo.index = pIdx->srcIdx; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.srcOpType = pBasicParam->srcOpType; From 7986951f440e9cbf585d1238ab077881b1b24e7f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 18 Jan 2024 09:22:39 +0800 Subject: [PATCH 10/22] use taskid of operator --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/exchangeoperator.c | 15 +++++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c3f47cde9d..72da249f50 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -200,6 +200,7 @@ typedef struct SExchangeInfo { uint64_t self; SLimitInfo limitInfo; int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo + char* pTaskId; } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index ac103ebbc1..06dd43e170 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -37,7 +37,7 @@ typedef struct SSourceDataInfo { int64_t startTime; int32_t code; EX_SOURCE_STATUS status; - char* taskId; + const char* taskId; SArray* pSrcUidList; int32_t srcOpType; bool tableSeq; @@ -261,11 +261,12 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const } int32_t len = strlen(id) + 1; + pInfo->pTaskId = taosMemoryCalloc(1, len); + strncpy(pInfo->pTaskId, id, len); for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.taskId = taosMemoryCalloc(1, len); - strncpy(dataInfo.taskId, id, len); + dataInfo.taskId = pInfo->pTaskId; dataInfo.index = i; SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); if (pDs == NULL) { @@ -370,7 +371,6 @@ void freeBlock(void* pParam) { void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; taosMemoryFreeClear(pInfo->pRsp); - taosMemoryFreeClear(pInfo->taskId); } void doDestroyExchangeOperatorInfo(void* param) { @@ -386,6 +386,8 @@ void doDestroyExchangeOperatorInfo(void* param) { tSimpleHashCleanup(pExInfo->pHashSources); tsem_destroy(&pExInfo->ready); + taosMemoryFreeClear(pExInfo->pTaskId); + taosMemoryFreeClear(param); } @@ -785,10 +787,7 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic if (pIdx->inUseIdx < 0) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; - char* pTaskId = GET_TASKID(pOperator->pTaskInfo); - int32_t len = strlen(pTaskId) + 1; - dataInfo.taskId = taosMemoryCalloc(1, len); - strncpy(dataInfo.taskId, pTaskId, len); + dataInfo.taskId = pExchangeInfo->pTaskId; dataInfo.index = pIdx->srcIdx; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.srcOpType = pBasicParam->srcOpType; From 84810185c733b8ba691ae1496aba7e0794647a70 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 18 Jan 2024 11:16:18 +0800 Subject: [PATCH 11/22] fix: skip acked msg in snapshotReSend --- source/libs/sync/src/syncSnapshot.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 578e6798e0..d9c8bb21ac 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -345,9 +345,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; - ASSERT(pBlk && !pBlk->acked); + ASSERT(pBlk); int64_t nowMs = taosGetTimestampMs(); - if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { + if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { continue; } if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) { From 06c9b8974bf3d0eaaa144d5aa20c71dc7727c66b Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 18 Jan 2024 04:06:25 +0000 Subject: [PATCH 12/22] rebuild stream backen from locak --- source/libs/stream/src/streamBackendRocksdb.c | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c8f944071f..bcf289a019 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -500,34 +500,27 @@ int32_t backendCopyFiles(char* src, char* dst) { // return 0; } int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = -1; - int32_t len = strlen(defaultPath) + 32; - char* tmp = taosMemoryCalloc(1, len); - sprintf(tmp, "%s%s", defaultPath, "_tmp"); - - if (taosIsDir(tmp)) taosRemoveDir(tmp); - if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); - - if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { - if (taosIsDir(tmp)) { - taosRemoveDir(tmp); - } + int32_t code = 0; + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); taosMkDir(defaultPath); + + stInfo("succ to clear stream backend %s", defaultPath); + } + if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { code = backendCopyFiles(chkpPath, defaultPath); if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + taosRemoveDir(defaultPath); + taosMkDir(defaultPath); + + stError("failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", chkpPath, + tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + code = 0; } else { stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); } } - if (code != 0) { - if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); - if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath); - } else { - taosRemoveDir(tmp); - } - taosMemoryFree(tmp); return code; } From ea6b39c01e272e784064730b69951263ecbab30d Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 18 Jan 2024 06:53:16 +0000 Subject: [PATCH 13/22] del rsma case --- tests/parallel_test/cases.task | 4 +++- tests/script/win-test-file | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 694af127af..acfe0f2a16 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1245,7 +1245,9 @@ e ,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim -,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim + +### refactor stream backend, open case after rsma refactored +#,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ,,n,script,./test.sh -f tsim/valgrind/checkError1.sim ,,n,script,./test.sh -f tsim/valgrind/checkError2.sim diff --git a/tests/script/win-test-file b/tests/script/win-test-file index 744415c89f..b9f250927f 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -280,7 +280,9 @@ ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim -./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim + +### refactor stream backend, open case after rsma refactored +#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError2.sim From bb715c249d770c2160d7058ab3b5b22ac5e89a13 Mon Sep 17 00:00:00 2001 From: zk66214 Date: Thu, 18 Jan 2024 16:58:55 +0800 Subject: [PATCH 14/22] delete duplicated cases --- tests/system-test/2-query/orderBy.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/tests/system-test/2-query/orderBy.py b/tests/system-test/2-query/orderBy.py index dedc57eab3..1cf1478439 100644 --- a/tests/system-test/2-query/orderBy.py +++ b/tests/system-test/2-query/orderBy.py @@ -278,19 +278,19 @@ class TDTestCase: def queryOrderByAgg(self): - tdSql.query("SELECT COUNT(*) FROM t1 order by COUNT(*)") + tdSql.no_error("SELECT COUNT(*) FROM t1 order by COUNT(*)") - tdSql.query("SELECT COUNT(*) FROM t1 order by last(c2)") + tdSql.no_error("SELECT COUNT(*) FROM t1 order by last(c2)") - tdSql.query("SELECT c1 FROM t1 order by last(ts)") + tdSql.no_error("SELECT c1 FROM t1 order by last(ts)") - tdSql.query("SELECT ts FROM t1 order by last(ts)") + tdSql.no_error("SELECT ts FROM t1 order by last(ts)") - tdSql.query("SELECT last(ts), ts, c1 FROM t1 order by 2") + tdSql.no_error("SELECT last(ts), ts, c1 FROM t1 order by 2") - tdSql.query("SELECT ts, last(ts) FROM t1 order by last(ts)") + tdSql.no_error("SELECT ts, last(ts) FROM t1 order by last(ts)") - tdSql.query(f"SELECT * FROM t1 order by last(ts)") + tdSql.no_error(f"SELECT * FROM t1 order by last(ts)") tdSql.query(f"SELECT last(ts) as t2, ts FROM t1 order by 1") tdSql.checkRows(1) @@ -302,6 +302,18 @@ class TDTestCase: tdSql.error(f"SELECT last(ts) as t2, ts FROM t1 order by last(t2)") + def queryOrderByAmbiguousName(self): + tdSql.error(sql="select c1 as name, c2 as name, c3 from t1 order by name", expectErrInfo='ambiguous', + fullMatched=False) + + tdSql.error(sql="select c1, c2 as c1, c3 from t1 order by c1", expectErrInfo='ambiguous', fullMatched=False) + + tdSql.error(sql='select last(ts), last(c1) as name ,last(c2) as name,last(c3) from t1 order by name', + expectErrInfo='ambiguous', fullMatched=False) + + tdSql.no_error("select c1 as name, c2 as c1, c3 from t1 order by c1") + + tdSql.no_error('select c1 as name from (select c1, c2 as name from st) order by name') # run def run(self): @@ -317,6 +329,8 @@ class TDTestCase: # agg self.queryOrderByAgg() + # td-28332 + self.queryOrderByAmbiguousName() # stop def stop(self): From b83bb8c1507d29bf4e0ef7a2a83e451ec8aa7d16 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 18 Jan 2024 17:20:31 +0800 Subject: [PATCH 15/22] fix: last_row error --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- tests/system-test/2-query/last_row.py | 46 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d86219542f..cc0bf2b774 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -941,7 +941,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if(lastrowTmpIndexArray != NULL) { - mergeLastCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); + mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i)); } diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index 6469b3ea54..a7223db1cd 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -861,11 +861,55 @@ class TDTestCase: self.support_super_table_test() + def initLastRowDelayTest(self, dbname="db"): + tdSql.execute(f"drop database if exists {dbname} ") + create_db_sql = f"create database if not exists {dbname} keep 3650 duration 1000 cachemodel 'NONE' REPLICA 1" + tdSql.execute(create_db_sql) + + tdSql.execute(f"use {dbname}") + tdSql.execute(f'create stable {dbname}.st(ts timestamp, v_int int, v_float float) TAGS (ctname varchar(32))') + + tdSql.execute(f"create table {dbname}.ct1 using st tags('ct1')") + tdSql.execute(f"create table {dbname}.ct2 using st tags('ct2')") + + tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000000000,86,86)") + tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000021255,59,59)") + tdSql.execute(f'flush database {dbname}') + tdSql.execute(f'select last(*) from {dbname}.st') + tdSql.execute(f'select last_row(*) from {dbname}.st') + tdSql.execute(f"insert into {dbname}.st(tbname,ts) values('ct1',1630000091255)") + tdSql.execute(f'flush database {dbname}') + tdSql.execute(f'select last(*) from {dbname}.st') + tdSql.execute(f'select last_row(*) from {dbname}.st') + tdSql.execute(f'alter database {dbname} cachemodel "both"') + tdSql.query(f'select last(*) from {dbname}.st') + tdSql.checkData(0 , 1 , 59) + + tdSql.query(f'select last_row(*) from {dbname}.st') + tdSql.checkData(0 , 1 , None) + tdSql.checkData(0 , 2 , None) + + tdLog.printNoPrefix("========== delay test init success ==============") + + def lastRowDelayTest(self, dbname="db"): + tdLog.printNoPrefix("========== delay test start ==============") + + tdSql.execute(f"use {dbname}") + + tdSql.query(f'select last(*) from {dbname}.st') + tdSql.checkData(0 , 1 , 59) + + tdSql.query(f'select last_row(*) from {dbname}.st') + tdSql.checkData(0 , 1 , None) + tdSql.checkData(0 , 2 , None) + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring # tdSql.prepare() tdLog.printNoPrefix("==========step1:create table ==============") + self.initLastRowDelayTest("DELAYTEST") + # cache_last 0 self.prepare_datas("'NONE' ") self.prepare_tag_datas("'NONE'") @@ -890,6 +934,8 @@ class TDTestCase: self.insert_datas_and_check_abs(self.tb_nums,self.row_nums,self.time_step,"'BOTH'") self.basic_query() + self.lastRowDelayTest("DELAYTEST") + def stop(self): tdSql.close() From cd6026c6a9f34b3b603244355f8e677b0cbf490d Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Thu, 18 Jan 2024 17:31:23 +0800 Subject: [PATCH 16/22] docs: keepColumnName notes --- docs/en/14-reference/12-config/index.md | 7 ++++--- docs/zh/14-reference/12-config/index.md | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 65c48f9190..c1abfd3e39 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -226,9 +226,10 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo | Attribute | Description | | ------------- | --------------------------------------------------------------------------------------------------------------- | | Applicable | Client only | -| Meaning | When the Last, First, LastRow function is queried, whether the returned column name contains the function name. | -| Value Range | 0 means including the function name, 1 means not including the function name. | -| Default Value | 0 | +| Meaning | When the Last, First, and LastRow functions are queried and no alias is specified, the alias is automatically set to the column name (excluding the function name). Therefore, if the order by clause refers to the column name, it will automatically refer to the function corresponding to the column. | +| Value Range | 1 means automatically setting the alias to the column name (excluding the function name), 0 means not automatically setting the alias. | +| Default Value | 0 | +| Notes | When multiple of the above functions act on the same column at the same time and no alias is specified, if the order by clause refers to the column name, column selection ambiguous will occur because the aliases of multiple columns are the same. | ## Locale Parameters diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 27a9618107..4d47f0771c 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -215,9 +215,10 @@ taos -C | 属性 | 说明 | | -------- | ----------------------------------------------------------- | | 适用范围 | 仅客户端适用 | -| 含义 | Last、First、LastRow 函数查询时,返回的列名是否包含函数名。 | -| 取值范围 | 0 表示包含函数名,1 表示不包含函数名。 | -| 缺省值 | 0 | +| 含义 | Last、First、LastRow 函数查询且未指定别名时,自动设置别名为列名(不含函数名),因此 order by 子句如果引用了该列名将自动引用该列对应的函数 | +| 取值范围 | 1 表示自动设置别名为列名(不包含函数名), 0 表示不自动设置别名。 | +| 缺省值 | 0 | +| 补充说明 | 当同时出现多个上述函数作用于同一列且未指定别名时,如果 order by 子句引用了该列名,将会因为多列别名相同引发列选择冲突| ### countAlwaysReturnValue From afbe4d99a8f6ad0517907c233b51f0860578b244 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 18 Jan 2024 18:16:18 +0800 Subject: [PATCH 17/22] fix: case --- tests/system-test/2-query/last_row.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index a7223db1cd..0744b3bae5 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -866,11 +866,12 @@ class TDTestCase: create_db_sql = f"create database if not exists {dbname} keep 3650 duration 1000 cachemodel 'NONE' REPLICA 1" tdSql.execute(create_db_sql) + time.sleep(3) tdSql.execute(f"use {dbname}") tdSql.execute(f'create stable {dbname}.st(ts timestamp, v_int int, v_float float) TAGS (ctname varchar(32))') - tdSql.execute(f"create table {dbname}.ct1 using st tags('ct1')") - tdSql.execute(f"create table {dbname}.ct2 using st tags('ct2')") + tdSql.execute(f"create table {dbname}.ct1 using {dbname}.st tags('ct1')") + tdSql.execute(f"create table {dbname}.ct2 using {dbname}.st tags('ct2')") tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000000000,86,86)") tdSql.execute(f"insert into {dbname}.st(tbname,ts,v_float, v_int) values('ct1',1630000021255,59,59)") From ff8908dedebfffc72cb798a7f4a46de496d52bdd Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 18 Jan 2024 18:30:21 +0800 Subject: [PATCH 18/22] ignore invalid state --- source/libs/stream/src/streamSessionState.c | 4 ++++ source/libs/stream/src/streamState.c | 1 + tests/script/tsim/stream/partitionby1.sim | 2 ++ 3 files changed, 7 insertions(+) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 2cb77c60dc..1f991d309f 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -90,6 +90,7 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; + pNewPos->beFlushed = true; memcpy(pNewPos->pRowBuff, p, *pVLen); taosMemoryFree(p); return pNewPos; @@ -217,6 +218,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; + pNewPos->beFlushed = true; void* pBuff = NULL; int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); if (code != TSDB_CODE_SUCCESS) { @@ -307,6 +309,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } pNewPos = getNewRowPosForWrite(pFileState); pNewPos->needFree = true; + pNewPos->beFlushed = true; } _end: @@ -482,6 +485,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; + pNewPos->beFlushed = true; memcpy(pNewPos->pRowBuff, pData, *pVLen); (*pVal) = pNewPos; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 19b7359981..ea20f0e2b1 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -698,6 +698,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, key->win.ekey, key->groupId, code); } else { + pos->beFlushed = false; code = putSessionWinResultBuff(pState->pFileState, value); } } diff --git a/tests/script/tsim/stream/partitionby1.sim b/tests/script/tsim/stream/partitionby1.sim index d92aecb3a6..24c588d410 100644 --- a/tests/script/tsim/stream/partitionby1.sim +++ b/tests/script/tsim/stream/partitionby1.sim @@ -13,6 +13,8 @@ sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by tbname interval(10s); +sleep 1000 + sql insert into ts1 values(1648791213001,1,12,3,1.0); sql insert into ts2 values(1648791213001,1,12,3,1.0); From 2365054ada6aa591cf78f4cbae2ea63c280a0b85 Mon Sep 17 00:00:00 2001 From: zk66214 Date: Thu, 18 Jan 2024 20:09:03 +0800 Subject: [PATCH 19/22] add common function no_error to TDsql --- tests/pytest/util/sql.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 62af9a1af9..92074161b6 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -97,7 +97,24 @@ class TDSql: i+=1 time.sleep(1) pass - + + def no_error(self, sql): + caller = inspect.getframeinfo(inspect.stack()[1][0]) + expectErrOccurred = False + + try: + self.cursor.execute(sql) + except BaseException as e: + expectErrOccurred = True + self.errno = e.errno + error_info = repr(e) + self.error_info = ','.join(error_info[error_info.index('(') + 1:-1].split(",")[:-1]).replace("'", "") + + if expectErrOccurred: + tdLog.exit("%s(%d) failed: sql:%s, unexpect error '%s' occurred" % (caller.filename, caller.lineno, sql, self.error_info)) + else: + tdLog.info("sql:%s, check passed, no ErrInfo occurred" % (sql)) + def error(self, sql, expectedErrno = None, expectErrInfo = None, fullMatched = True): caller = inspect.getframeinfo(inspect.stack()[1][0]) expectErrNotOccured = True @@ -126,9 +143,9 @@ class TDSql: if expectErrInfo != None: if expectErrInfo == self.error_info: - tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) + tdLog.info("sql:%s, expected ErrInfo '%s' occured" % (sql, expectErrInfo)) else: - tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo '%s' occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + tdLog.exit("%s(%d) failed: sql:%s, ErrInfo '%s' occured, but not expected ErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) else: if expectedErrno != None: if expectedErrno in self.errno: @@ -138,9 +155,9 @@ class TDSql: if expectErrInfo != None: if expectErrInfo in self.error_info: - tdLog.info("sql:%s, expected expectErrInfo '%s' occured" % (sql, expectErrInfo)) + tdLog.info("sql:%s, expected ErrInfo '%s' occured" % (sql, expectErrInfo)) else: - tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected expectErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) + tdLog.exit("%s(%d) failed: sql:%s, ErrInfo %s occured, but not expected ErrInfo '%s'" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo)) return self.error_info From ba48115231b9b5fb6413bc5e12f4adeb9053f465 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 19 Jan 2024 10:24:39 +0800 Subject: [PATCH 20/22] fix: heap user after free --- source/client/inc/clientInt.h | 1 + source/client/src/clientEnv.c | 1 + source/client/src/clientHb.c | 22 +++++++++++++--------- source/client/src/clientMsgHandler.c | 20 ++++++++++++++++++-- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a6d5039be7..fd4776664c 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -155,6 +155,7 @@ typedef struct STscObj { int8_t biMode; int32_t acctId; uint32_t connId; + int32_t appHbMgrIdx; int64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b6c5701915..f1e9e36433 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -283,6 +283,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c pObj->connType = connType; pObj->pAppInfo = pAppInfo; + pObj->appHbMgrIdx = pAppInfo->pAppHbMgr->idx; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index b3d9a9ced5..e076a874d3 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -30,7 +30,7 @@ typedef struct { }; } SHbParam; -static SClientHbMgr clientHbMgr = {0}; +SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); @@ -1294,9 +1294,8 @@ void hbMgrCleanUp() { taosThreadMutexLock(&clientHbMgr.lock); appHbMgrCleanup(); - taosArrayDestroy(clientHbMgr.appHbMgrs); + clientHbMgr.appHbMgrs = taosArrayDestroy(clientHbMgr.appHbMgrs); taosThreadMutexUnlock(&clientHbMgr.lock); - clientHbMgr.appHbMgrs = NULL; } int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) { @@ -1335,13 +1334,18 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { - SAppHbMgr *pAppHbMgr = pTscObj->pAppInfo->pAppHbMgr; - SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - if (pReq) { - tFreeClientHbReq(pReq); - taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRelease(pAppHbMgr->activeInfo, pReq); + SClientHbReq *pReq = NULL; + taosThreadMutexLock(&clientHbMgr.lock); + SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); + if (pAppHbMgr) { + pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + if (pReq) { + tFreeClientHbReq(pReq); + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRelease(pAppHbMgr->activeInfo, pReq); + } } + taosThreadMutexUnlock(&clientHbMgr.lock); if (NULL == pReq) { return; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e0cedb9924..fbef9dfad4 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -26,6 +26,8 @@ #include "tname.h" #include "tversion.h" +extern SClientHbMgr clientHbMgr; + static void setErrno(SRequestObj* pRequest, int32_t code) { pRequest->code = code; terrno = code; @@ -63,12 +65,21 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { STscObj* pTscObj = pRequest->pTscObj; - if (NULL == pTscObj->pAppInfo || NULL == pTscObj->pAppInfo->pAppHbMgr) { + if (NULL == pTscObj->pAppInfo) { setErrno(pRequest, TSDB_CODE_TSC_DISCONNECTED); tsem_post(&pRequest->body.rspSem); goto End; } + taosThreadMutexLock(&clientHbMgr.lock); + if (NULL == taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx)) { + taosThreadMutexUnlock(&clientHbMgr.lock); + setErrno(pRequest, TSDB_CODE_TSC_DISCONNECTED); + tsem_post(&pRequest->body.rspSem); + goto End; + } + taosThreadMutexUnlock(&clientHbMgr.lock); + SConnectRsp connectRsp = {0}; if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) { code = TSDB_CODE_TSC_INVALID_VERSION; @@ -142,7 +153,12 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->authVer = connectRsp.authVer; pTscObj->whiteListInfo.ver = connectRsp.whiteListVer; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + taosThreadMutexLock(&clientHbMgr.lock); + SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); + if (pAppHbMgr) { + hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + } + taosThreadMutexUnlock(&clientHbMgr.lock); tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, pTscObj->pAppInfo->numOfConns); From ff06f9ef4260c92069fa993504da6af00b27bc78 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 19 Jan 2024 14:31:02 +0800 Subject: [PATCH 21/22] fix: heap user after free --- source/client/src/clientMsgHandler.c | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index fbef9dfad4..324b99022b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -66,20 +66,12 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { STscObj* pTscObj = pRequest->pTscObj; if (NULL == pTscObj->pAppInfo) { - setErrno(pRequest, TSDB_CODE_TSC_DISCONNECTED); + code = TSDB_CODE_TSC_DISCONNECTED; + setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); goto End; } - taosThreadMutexLock(&clientHbMgr.lock); - if (NULL == taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx)) { - taosThreadMutexUnlock(&clientHbMgr.lock); - setErrno(pRequest, TSDB_CODE_TSC_DISCONNECTED); - tsem_post(&pRequest->body.rspSem); - goto End; - } - taosThreadMutexUnlock(&clientHbMgr.lock); - SConnectRsp connectRsp = {0}; if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) { code = TSDB_CODE_TSC_INVALID_VERSION; @@ -106,7 +98,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { } if (connectRsp.epSet.numOfEps == 0) { - setErrno(pRequest, TSDB_CODE_APP_ERROR); + code = TSDB_CODE_APP_ERROR; + setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); goto End; } @@ -157,6 +150,12 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + } else { + taosThreadMutexUnlock(&clientHbMgr.lock); + code = TSDB_CODE_TSC_DISCONNECTED; + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + goto End; } taosThreadMutexUnlock(&clientHbMgr.lock); From bc9dfd8a8325bff3a7fc7d081991218a7d333eea Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 19 Jan 2024 14:42:02 +0800 Subject: [PATCH 22/22] fix: heap use after free --- source/client/src/clientHb.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e076a874d3..63a65d7c95 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -1334,24 +1334,18 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { - SClientHbReq *pReq = NULL; taosThreadMutexLock(&clientHbMgr.lock); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { - pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (pReq) { tFreeClientHbReq(pReq); taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); taosHashRelease(pAppHbMgr->activeInfo, pReq); + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } } taosThreadMutexUnlock(&clientHbMgr.lock); - - if (NULL == pReq) { - return; - } - - atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner