Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

This commit is contained in:
Hongze Cheng 2023-06-02 09:31:54 +08:00
commit 884d2cc834
4 changed files with 24 additions and 18 deletions

View File

@ -28,7 +28,7 @@
#include "parser.h"
#include "tname.h"
#define MND_TOPIC_VER_NUMBER 2
#define MND_TOPIC_VER_NUMBER 3
#define MND_TOPIC_RESERVE_SIZE 64
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
@ -170,7 +170,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
if (sver != 1 && sver != 2) {
if (sver < 1 || sver > MND_TOPIC_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto TOPIC_DECODE_OVER;
}
@ -197,7 +197,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
if (sver >= 3) {
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
}
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) {
@ -922,13 +924,12 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
taosMemoryFree(schemaJson);
return -1;
STR_TO_VARSTR(schemaJson, "NULL");
mError("mndRetrieveTopic mndAcquireStb null stbName:%s", pTopic->stbName);
}else{
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
mndReleaseStb(pMnode, pStb);
}
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
mndReleaseStb(pMnode, pStb);
}else{
STR_TO_VARSTR(schemaJson, "NULL");
}

View File

@ -37,7 +37,9 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
if (pHandle->execHandle.execTb.qmsg != NULL){
if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
}
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -65,7 +67,9 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
if (!tDecodeIsEnd(pDecoder)){
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
}
}
tEndDecode(pDecoder);
return 0;
@ -339,7 +343,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
if(strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return -1;

View File

@ -1002,9 +1002,10 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
pRes->info.id.groupId = groupId;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
pInfo->srcRowIndex++;
if (pInfo->srcRowIndex == 0) {
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
@ -1242,7 +1243,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* fillResult = NULL;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
// If there are delete datablocks, we receive them first.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@ -1281,7 +1282,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case STREAM_PULL_DATA: {
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = 0;
pInfo->srcRowIndex = -1;
} break;
case STREAM_CREATE_CHILD_TABLE: {
return pBlock;
@ -1497,7 +1498,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error;
}
pInfo->srcRowIndex = 0;
pInfo->srcRowIndex = -1;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet =

View File

@ -2904,7 +2904,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if ((!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo) {
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
}
pScanInfo->twAggSup = *pTwSup;