Merge branch '3.0' of github.com:taosdata/TDengine into fix/TD-31288

This commit is contained in:
54liuyao 2024-08-08 10:36:09 +08:00
commit 5c859554d0
37 changed files with 265 additions and 175 deletions

View File

@ -526,19 +526,17 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
int32_t code = TSDB_CODE_SUCCESS;
*pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
if (NULL == *pRequest) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
STscObj *pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto _return;
TSC_ERR_JRET(terrno);
}
SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (interParam == NULL) {
releaseTscObj(connId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
TSC_ERR_JRET(terrno);
}
TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
interParam->pRequest = *pRequest;
@ -566,7 +564,11 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
return TSDB_CODE_SUCCESS;
_return:
doDestroyRequest(*pRequest);
if ((*pRequest)->pTscObj) {
doDestroyRequest(*pRequest);
} else {
taosMemoryFree(*pRequest);
}
return code;
}

View File

@ -533,7 +533,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
return code;
}
code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0);
code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code) {
taosMemoryFree(buf);
}

View File

@ -320,7 +320,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
return terrno;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return terrno;
@ -424,7 +424,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
(void) epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
@ -484,7 +484,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
@ -540,7 +540,8 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0,
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
@ -713,7 +714,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}

View File

@ -1677,7 +1677,7 @@ int32_t mndAcquireUser(SMnode *pMnode, const char *userName, SUserObj **ppUser)
*ppUser = sdbAcquire(pSdb, SDB_USER, userName);
if (*ppUser == NULL) {
if (code == TSDB_CODE_SDB_OBJ_NOT_THERE) {
if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
code = TSDB_CODE_MND_USER_NOT_EXIST;
} else {
code = TSDB_CODE_MND_USER_NOT_AVAILABLE;
@ -3149,7 +3149,8 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
(void)memcpy(rsp.user, pUsers[i].user, TSDB_USER_LEN);
(void)taosArrayPush(batchRsp.pArray, &rsp);
}
mError("user:%s, failed to auth user since %s", pUsers[i].user, terrstr());
mError("user:%s, failed to auth user since %s", pUsers[i].user, tstrerror(code));
code = 0;
continue;
}

View File

@ -28,7 +28,7 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pLoadInfo->blockData[0].sttBlockIndex = -1;
@ -50,9 +50,8 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
if (pLoadInfo->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pLoadInfo);
return code;
return terrno;
}
pLoadInfo->pSchema = pSchema;
@ -358,7 +357,7 @@ static int32_t tValueDupPayload(SValue *pVal) {
char *p = (char *)pVal->pData;
char *pBuf = taosMemoryMalloc(pVal->nData);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(pBuf, p, pVal->nData);
@ -371,13 +370,15 @@ static int32_t tValueDupPayload(SValue *pVal) {
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* px = NULL;
int32_t startIndex = 0;
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
if (numOfBlocks <= 0) {
return code;
}
int32_t startIndex = 0;
while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
++startIndex;
}
@ -413,150 +414,113 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
// existed
if (i < rows) {
if (pBlockLoadInfo->info.pUid == NULL) {
pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pFirstTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pLastTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t));
SSttTableRowsInfo* pInfo = &pBlockLoadInfo->info;
pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(SValue));
pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(SValue));
if (pInfo->pUid == NULL) {
pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t));
pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t));
pInfo->pCount = taosArrayInit(rows, sizeof(int64_t));
pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue));
pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue));
if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL ||
pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) {
code = terrno;
goto _end;
}
}
if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
int32_t size = rows - i;
int32_t offset = i * sizeof(int64_t);
px = taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, offset), size);
if (px == NULL) {
return terrno;
}
px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
if (block.numOfPKs > 0) {
SValue vFirst = {0}, vLast = {0};
for (int32_t f = i; f < rows; ++f) {
code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
code = tValueDupPayload(&vFirst);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
// todo add api to clone the original data
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
code = tValueDupPayload(&vLast);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &vLast);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
} else {
SValue vFirst = {0};
for (int32_t j = 0; j < size; ++j) {
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
}
} else {
STbStatisRecord record = {0};
while (i < rows) {
(void)tStatisBlockGet(&block, i, &record);
if (record.suid != suid) {
break;
}
px = taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pUid, &record.uid);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pCount, &record.count);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
if (record.firstKey.numOfPKs > 0) {
SValue s = record.firstKey.pks[0];
code = tValueDupPayload(&s);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &s);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
s = record.lastKey.pks[0];
code = tValueDupPayload(&s);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &s);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
} else {
SValue v = {0};
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &v);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &v);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &v);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &v);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
i += 1;
@ -565,6 +529,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
}
}
_end:
(void)tStatisBlockDestroy(&block);
double el = (taosGetTimestampUs() - st) / 1000.0;

View File

@ -147,7 +147,10 @@ _error:
if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -247,7 +247,10 @@ _error:
}
pInfo->pTableList = NULL;
destroyCacheScanOperator(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}
@ -448,7 +451,7 @@ void destroyCacheScanOperator(void* param) {
taosArrayDestroy(pInfo->matchInfo.pList);
tableListDestroy(pInfo->pTableList);
if (pInfo->pLastrowReader != NULL) {
if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
pInfo->pLastrowReader = NULL;
}

View File

@ -342,7 +342,10 @@ _error:
destroyCountWindowOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -145,7 +145,10 @@ _error:
destroyEWindowOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -452,7 +452,10 @@ _error:
doDestroyExchangeOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -707,8 +710,8 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t
}
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pBlock = NULL;
if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes);
@ -757,6 +760,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
QUERY_CHECK_CODE(code, lino, _end);
blockDataDestroy(pBlock);
pBlock = NULL;
}
_end:

View File

@ -567,7 +567,10 @@ _error:
}
pTaskInfo->code = code;
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}

View File

@ -1504,7 +1504,10 @@ _error:
destroyGroupCacheOperator(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -1246,7 +1246,10 @@ _error:
destroyPartitionOperatorInfo(pInfo);
}
pTaskInfo->code = code;
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
TAOS_RETURN(code);
}
@ -1792,7 +1795,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
_error:
pTaskInfo->code = code;
if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}

View File

@ -650,7 +650,7 @@ void destroyOperator(SOperatorInfo* pOperator) {
freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
if (pOperator->fpSet.closeFn != NULL) {
if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
pOperator->fpSet.closeFn(pOperator->info);
}

View File

@ -180,7 +180,10 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
_error:
destroyProjectOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -529,7 +532,10 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_error:
destroyIndefinitOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -1466,7 +1466,10 @@ _error:
destroyTableScanOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -3777,7 +3780,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
destroyOperator(pStreamScan->pTableScanOp);
}
if (pStreamScan->tqReader) {
if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) {
pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
}
if (pStreamScan->matchInfo.pList) {
@ -4147,7 +4150,10 @@ _error:
destroyStreamScanOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -4608,7 +4614,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
if (pInfo->pCtbCursor != NULL) {
if (pInfo->pCtbCursor != NULL && pInfo->pStorageAPI != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
}
taosHashCleanup(pInfo->filterCtx.colHash);
@ -4705,7 +4711,10 @@ _error:
}
if (pInfo != NULL) destroyTagScanOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}
@ -5770,8 +5779,10 @@ void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
// start one reader variable
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
if (pTableScanInfo->base.readerAPI.tsdReaderClose != NULL) {
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
}
for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) {
if (pTableScanInfo->nextDurationBlocks[i] != NULL) {
@ -5950,7 +5961,10 @@ _error:
pTaskInfo->code = code;
pInfo->base.pTableListInfo = NULL;
if (pInfo != NULL) destroyTableMergeScanOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}
@ -6107,7 +6121,10 @@ _error:
if (pInfo != NULL) {
destoryTableCountScanOperator(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -164,7 +164,10 @@ _error:
destroySortOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -836,6 +839,9 @@ _error:
if (pInfo != NULL) {
destroyGroupSortOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}

View File

@ -926,7 +926,10 @@ _error:
destroyStreamCountAggOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -981,7 +981,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_error:
if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;

View File

@ -1459,7 +1459,10 @@ _error:
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -2013,7 +2013,10 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
_error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -3832,7 +3835,10 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -4088,7 +4094,10 @@ _error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
@ -4978,7 +4987,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_error:
if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
@ -5314,7 +5326,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -672,6 +672,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
blockDataDestroy(pDataBlock);
pDataBlock = NULL;
if (ret != 0) {
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL;
@ -683,6 +684,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(pDataBlock);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
@ -695,6 +697,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* dataBlock = NULL;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
@ -704,7 +707,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
@ -826,6 +829,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
blockDataDestroy(dataBlock);
dataBlock = NULL;
if (ret != 0) {
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL;
@ -837,6 +841,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(dataBlock);
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL;
pTaskInfo->code = code;
@ -1310,9 +1315,11 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
QUERY_CHECK_CODE(code, lino, _end);
blockDataDestroy(p);
p = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
blockDataDestroy(p);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
@ -1325,6 +1332,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info;
SSysTableIndex* pIdx = pInfo->pIdx;
SSDataBlock* p = NULL;
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
@ -1344,7 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
varDataSetLen(dbname, strlen(varDataVal(dbname)));
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
@ -1545,12 +1553,14 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
}
blockDataDestroy(p);
p = NULL;
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p);
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
@ -1563,6 +1573,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int8_t firstMetaCursor = 0;
SSDataBlock* p = NULL;
SSysTableScanInfo* pInfo = pOperator->info;
if (pInfo->pCur == NULL) {
@ -1590,7 +1601,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
varDataSetLen(dbname, strlen(varDataVal(dbname)));
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
@ -1783,6 +1794,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
}
blockDataDestroy(p);
p = NULL;
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
@ -1796,6 +1808,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p);
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2209,7 +2222,10 @@ _error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -2244,7 +2260,7 @@ void destroySysScanOperator(void* param) {
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
if (pInfo->pAPI != NULL && pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
}
@ -2726,7 +2742,9 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static void destroyBlockDistScanOperatorInfo(void* param) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock);
pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle);
if (pDistInfo->readHandle.api.tsdReader.tsdReaderClose != NULL) {
pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle);
}
tableListDestroy(pDistInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
@ -2815,6 +2833,9 @@ _error:
pInfo->pTableListInfo = NULL;
destroyBlockDistScanOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
return code;
}

View File

@ -1208,7 +1208,10 @@ _error:
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -1414,7 +1414,10 @@ _error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -1690,7 +1693,10 @@ _error:
destroyStateWindowOperatorInfo(pInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -1783,7 +1789,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
_error:
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -2094,8 +2103,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
return code;
_error:
destroyMAIOperatorInfo(miaInfo);
destroyOperator(pOperator);
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}
@ -2427,7 +2439,10 @@ _error:
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
}
destroyOperator(pOperator);
if (pOperator != NULL) {
pOperator->info = NULL;
destroyOperator(pOperator);
}
pTaskInfo->code = code;
return code;
}

View File

@ -242,7 +242,7 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const
return EINVAL;
#else
int32_t code = pthread_cond_timedwait(cond, mutex, abstime);
if (code) {
if (code && code != ETIMEDOUT) {
terrno = TAOS_SYSTEM_ERROR(code);
return terrno;
}

View File

@ -195,6 +195,10 @@ class TDTestCase:
tdSql.checkData(1, 4, 2)
tdSql.checkData(2, 4, 9)
tdSql.checkData(3, 4, 9)
sql = "SELECT _wstart, last(c1) FROM t6 INTERVAL(1w);"
tdSql.query(sql)
tdSql.checkRows(11)
def test_partition_by_limit_no_agg(self):
sql_template = 'select t1 from meters partition by t1 limit %d'

View File

@ -52,7 +52,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -52,7 +52,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@ -121,7 +121,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -121,7 +121,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -52,7 +52,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@ -121,7 +121,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -54,7 +54,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -54,7 +54,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -56,7 +56,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -56,7 +56,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -54,7 +54,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@ -123,7 +123,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -140,7 +140,7 @@ class TDTestCase:
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 2,
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@ -190,9 +190,6 @@ class TDTestCase:
# redistribute vgroup
self.redistributeVgroups()
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)

View File

@ -633,7 +633,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
int32_t cnt = 0;
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);