sma add ret check
This commit is contained in:
parent
d35bdba180
commit
be04bfa1ce
|
@ -10590,7 +10590,7 @@ int32_t tCloneTbTSMAInfo(STableTSMAInfo *pInfo, STableTSMAInfo **pRes) {
|
|||
if (!pRet->ast) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (code) {
|
||||
tFreeTableTSMAInfo(pRet);
|
||||
tFreeAndClearTableTSMAInfo(pRet);
|
||||
pRet = NULL;
|
||||
}
|
||||
*pRes = pRet;
|
||||
|
|
|
@ -298,12 +298,7 @@ void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) {
|
|||
sdbRelease(pSdb, pSma);
|
||||
}
|
||||
|
||||
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *smaName) {
|
||||
SName name = {0};
|
||||
tNameFromString(&name, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
char db[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, db);
|
||||
SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *db) {
|
||||
|
||||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
|
@ -312,7 +307,10 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
|||
SEncoder encoder = {0};
|
||||
int32_t contLen = 0;
|
||||
SName name = {0};
|
||||
tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
int32_t code = tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SVCreateTSmaReq req = {0};
|
||||
req.version = 0;
|
||||
|
@ -370,7 +368,7 @@ static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma,
|
|||
SEncoder encoder = {0};
|
||||
int32_t contLen;
|
||||
SName name = {0};
|
||||
tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
(void)tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
SVDropTSmaReq req = {0};
|
||||
req.indexUid = pSma->uid;
|
||||
|
@ -782,10 +780,14 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
|
||||
static int32_t mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
|
||||
SName n;
|
||||
tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
int32_t code = tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
sprintf(streamName, "%d.%s", n.acctId, n.tname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
||||
|
@ -817,7 +819,10 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, createReq.name);
|
||||
code = mndGetStreamNameFromSmaName(streamName, createReq.name);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||
if (pStream != NULL || code == 0) {
|
||||
|
@ -843,7 +848,15 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbBySma(pMnode, createReq.name);
|
||||
SName name = {0};
|
||||
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
char db[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
(void)tNameGetFullDbName(&name, db);
|
||||
|
||||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
code = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
|
@ -964,6 +977,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
SVgObj *pVgroup = NULL;
|
||||
SStbObj *pStb = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
|
||||
if (pVgroup == NULL) {
|
||||
|
@ -993,9 +1007,11 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
mndTransSetSerial(pTrans);
|
||||
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
code = mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||
if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
|
||||
|
@ -1054,7 +1070,10 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
}
|
||||
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
code = mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SStreamObj *pStream = NULL;
|
||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||
|
@ -1144,7 +1163,15 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbBySma(pMnode, dropReq.name);
|
||||
SName name = {0};
|
||||
code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
char db[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
(void)tNameGetFullDbName(&name, db);
|
||||
|
||||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
code = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
|
@ -1303,7 +1330,11 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
|
||||
contLen = tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
|
||||
if (contLen < 0) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pReq->info.rsp = pRsp;
|
||||
pReq->info.rspLen = contLen;
|
||||
|
@ -1349,7 +1380,11 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
|
||||
contLen = tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
|
||||
if (contLen < 0) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pReq->info.rsp = pRsp;
|
||||
pReq->info.rspLen = contLen;
|
||||
|
@ -1372,6 +1407,7 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
int32_t numOfRows = 0;
|
||||
SSmaObj *pSma = NULL;
|
||||
int32_t cols = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
if (strlen(pShow->db) > 0) {
|
||||
|
@ -1391,47 +1427,63 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
cols = 0;
|
||||
|
||||
SName smaName = {0};
|
||||
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
|
||||
|
||||
char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
|
||||
|
||||
SName stbName = {0};
|
||||
tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
|
||||
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
|
||||
STR_TO_VARSTR(n2, (char *)mndGetDbStr(pSma->db));
|
||||
code = tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
}
|
||||
SColumnInfoData* pColInfo = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
||||
}
|
||||
|
||||
char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(col, (char *)"");
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
||||
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tag, (char *)"sma_index");
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
|
||||
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tag, (char *)"sma_index");
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pSma);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
|
||||
numOfRows = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
@ -1520,7 +1572,7 @@ static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMC
|
|||
pStream->ast = taosStrdup(pSma->ast);
|
||||
}
|
||||
|
||||
static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||
static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||
tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -1540,11 +1592,21 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
|||
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
|
||||
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
|
||||
pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
|
||||
if (!pCxt->pCreateStreamReq->ast) {
|
||||
return terrno;
|
||||
}
|
||||
pCxt->pCreateStreamReq->sql = taosStrdup(pCxt->pCreateSmaReq->sql);
|
||||
if (!pCxt->pCreateStreamReq->sql) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// construct tags
|
||||
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
|
||||
if (!pCxt->pCreateStreamReq->pTags) {
|
||||
return terrno;
|
||||
}
|
||||
SField f = {0};
|
||||
int32_t code = 0;
|
||||
if (pCxt->pSrcStb) {
|
||||
for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
|
||||
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
|
||||
|
@ -1552,25 +1614,39 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
|||
f.type = pSchema->type;
|
||||
f.flags = pSchema->flags;
|
||||
tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
|
||||
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
|
||||
if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
|
||||
f.flags = COL_SMA_ON;
|
||||
f.type = TSDB_DATA_TYPE_BINARY;
|
||||
tstrncpy(f.name, "tbname", strlen("tbname") + 1);
|
||||
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
|
||||
|
||||
// construct output cols
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pCxt->pProjects) {
|
||||
SExprNode* pExprNode = (SExprNode*)pNode;
|
||||
f.bytes = pExprNode->resType.bytes;
|
||||
f.type = pExprNode->resType.type;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
f.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
|
||||
f.flags = COL_SMA_ON;
|
||||
strcpy(f.name, pExprNode->userAlias);
|
||||
taosArrayPush(pCxt->pCreateStreamReq->pCols, &f);
|
||||
f.type = TSDB_DATA_TYPE_BINARY;
|
||||
tstrncpy(f.name, "tbname", strlen("tbname") + 1);
|
||||
if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pTags, &f)) {
|
||||
code = terrno;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// construct output cols
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pCxt->pProjects) {
|
||||
SExprNode* pExprNode = (SExprNode*)pNode;
|
||||
f.bytes = pExprNode->resType.bytes;
|
||||
f.type = pExprNode->resType.type;
|
||||
f.flags = COL_SMA_ON;
|
||||
strcpy(f.name, pExprNode->userAlias);
|
||||
if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
|
||||
|
@ -1733,7 +1809,10 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
|
|||
}
|
||||
}
|
||||
pCxt->pDropStreamReq = &dropStreamReq;
|
||||
mndCreateTSMABuildCreateStreamReq(pCxt);
|
||||
code = mndCreateTSMABuildCreateStreamReq(pCxt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
mndCreateTSMABuildDropStreamReq(pCxt);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != (code = mndCreateTSMATxnPrepare(pCxt))) {
|
||||
|
@ -1753,11 +1832,15 @@ _OVER:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static void mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, char* targetStbName) {
|
||||
static int32_t mndTSMAGenerateOutputName(const char* tsmaName, char* streamName, char* targetStbName) {
|
||||
SName smaName;
|
||||
tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
int32_t code = tNameFromString(&smaName, tsmaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
|
||||
snprintf(targetStbName, TSDB_TABLE_FNAME_LEN, "%s"TSMA_RES_STB_POSTFIX, tsmaName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
||||
|
@ -1799,7 +1882,11 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
|||
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
|
||||
code = mndTSMAGenerateOutputName(createReq.name, streamName, streamTargetStbFullName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
mInfo("tsma:%s, faield to generate name", createReq.name);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name);
|
||||
if (pSma && createReq.igExists) {
|
||||
|
@ -1828,7 +1915,15 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbBySma(pMnode, createReq.name);
|
||||
SName name = {0};
|
||||
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
char db[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
(void)tNameGetFullDbName(&name, db);
|
||||
|
||||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
code = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
|
@ -1978,7 +2073,10 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
|
|||
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
|
||||
code = mndTSMAGenerateOutputName(dropReq.name, streamName, streamTargetStbFullName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
|
||||
|
||||
|
@ -1991,7 +2089,15 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
|
|||
code = TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
pDb = mndAcquireDbBySma(pMnode, dropReq.name);
|
||||
SName name = {0};
|
||||
code = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _OVER;
|
||||
}
|
||||
char db[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
(void)tNameGetFullDbName(&name, db);
|
||||
|
||||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (!pDb) {
|
||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
|
@ -2032,6 +2138,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
|||
int32_t numOfRows = 0;
|
||||
SSmaObj * pSma = NULL;
|
||||
SMnode * pMnode = pReq->info.node;
|
||||
int32_t code = 0;
|
||||
SColumnInfoData *pColInfo;
|
||||
if (pShow->db[0]) {
|
||||
pDb = mndAcquireDb(pMnode, pShow->db);
|
||||
|
@ -2054,65 +2161,90 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
|||
int32_t cols = 0;
|
||||
SName n = {0};
|
||||
|
||||
tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
code = tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
|
||||
}
|
||||
|
||||
char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
|
||||
}
|
||||
|
||||
tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
}
|
||||
char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
|
||||
}
|
||||
|
||||
tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
}
|
||||
|
||||
// stream name
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// stream name
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
|
||||
}
|
||||
|
||||
// interval
|
||||
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t len = 0;
|
||||
if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
|
||||
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
||||
getPrecisionUnit(pSrcDb->cfg.precision));
|
||||
} else {
|
||||
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (!IS_CALENDAR_TIME_DURATION(pSma->intervalUnit)) {
|
||||
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
||||
getPrecisionUnit(pSrcDb->cfg.precision));
|
||||
} else {
|
||||
len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, pSma->intervalUnit);
|
||||
}
|
||||
varDataSetLen(interval, len);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, interval, false);
|
||||
}
|
||||
varDataSetLen(interval, len);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, interval, false);
|
||||
|
||||
// create sql
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
char buf[TSDB_MAX_SAVED_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
len = snprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
|
||||
varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
|
||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// create sql
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
len = snprintf(buf + VARSTR_HEADER_SIZE, TSDB_MAX_SAVED_SQL_LEN, "%s", pSma->sql);
|
||||
varDataSetLen(buf, TMIN(len, TSDB_MAX_SAVED_SQL_LEN));
|
||||
code = colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||
}
|
||||
|
||||
// func list
|
||||
len = 0;
|
||||
char * start = buf + VARSTR_HEADER_SIZE;
|
||||
SNode *pNode = NULL, *pFunc = NULL;
|
||||
nodesStringToNode(pSma->ast, &pNode);
|
||||
if (pNode) {
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesStringToNode(pSma->ast, &pNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
char * start = buf + VARSTR_HEADER_SIZE;
|
||||
FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
|
||||
if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
|
||||
SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
|
||||
|
@ -2128,13 +2260,21 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
|||
}
|
||||
nodesDestroyNode(pNode);
|
||||
}
|
||||
varDataSetLen(buf, len);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
varDataSetLen(buf, len);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
code = colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
mndReleaseDb(pMnode, pSrcDb);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
sdbCancelFetch(pMnode->pSdb, pIter->pSmaIter);
|
||||
numOfRows = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pShow->numOfRows += numOfRows;
|
||||
|
@ -2163,13 +2303,22 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
|
|||
pInfo->tsmaId = pSma->uid;
|
||||
pInfo->destTbUid = pDestStb->uid;
|
||||
SName sName = {0};
|
||||
tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
code = tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
|
||||
tstrncpy(pInfo->targetDbFName, pSma->db, TSDB_DB_FNAME_LEN);
|
||||
tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
code = tNameFromString(&sName, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
tstrncpy(pInfo->targetTb, sName.tname, TSDB_TABLE_NAME_LEN);
|
||||
tstrncpy(pInfo->dbFName, pSma->db, TSDB_DB_FNAME_LEN);
|
||||
tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
code = tNameFromString(&sName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
tstrncpy(pInfo->tb, sName.tname, TSDB_TABLE_NAME_LEN);
|
||||
pInfo->pFuncs = taosArrayInit(8, sizeof(STableTSMAFuncInfo));
|
||||
if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -2206,7 +2355,10 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
|
||||
taosArrayPush(pInfo->pTags, &pDestStb->pTags[i]);
|
||||
if (NULL == taosArrayPush(pInfo->pTags, &pDestStb->pTags[i])) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2217,7 +2369,10 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
|
|||
else {
|
||||
// skip _wstart, _wend, _duration
|
||||
for (int32_t i = 1; i < pDestStb->numOfColumns - 2; ++i) {
|
||||
taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i]);
|
||||
if (NULL == taosArrayPush(pInfo->pUsedCols, &pDestStb->pColumns[i])) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2280,12 +2435,12 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
|
|||
sdbRelease(pMnode->pSdb, pSma);
|
||||
if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
|
||||
if (terrno) {
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
tFreeAndClearTableTSMAInfo(pTsma);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
tFreeAndClearTableTSMAInfo(pTsma);
|
||||
}
|
||||
*exist = true;
|
||||
}
|
||||
|
@ -2320,7 +2475,11 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
|||
|
||||
SName smaName;
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
|
||||
pStream = NULL;
|
||||
|
||||
|
@ -2351,13 +2510,13 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
|||
sdbRelease(pSdb, pSma);
|
||||
if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
|
||||
if (terrno) {
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
tFreeAndClearTableTSMAInfo(pTsma);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
tFreeAndClearTableTSMAInfo(pTsma);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
@ -2418,7 +2577,11 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
|
||||
int32_t len = tSerializeTableTSMAInfoRsp(pRsp, contLen, &rsp);
|
||||
if (len < 0) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pReq->info.rsp = pRsp;
|
||||
pReq->info.rspLen = contLen;
|
||||
|
@ -2434,7 +2597,7 @@ _OVER:
|
|||
static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo **ppTsma) {
|
||||
STableTSMAInfo *pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
|
||||
if (!pInfo) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
pInfo->pFuncs = NULL;
|
||||
pInfo->tsmaId = pTsmaVer->tsmaId;
|
||||
|
@ -2443,6 +2606,10 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo *
|
|||
tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN);
|
||||
pInfo->dbId = pTsmaVer->dbId;
|
||||
pInfo->ast = taosMemoryCalloc(1, 1);
|
||||
if (!pInfo->ast) {
|
||||
taosMemoryFree(pInfo);
|
||||
return terrno;
|
||||
}
|
||||
*ppTsma = pInfo;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2458,7 +2625,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
|
||||
hbRsp.pTsmas = taosArrayInit(numOfTsmas, POINTER_BYTES);
|
||||
if (!hbRsp.pTsmas) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -2473,7 +2640,11 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
if (!pSma) {
|
||||
code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
|
||||
if (code) goto _OVER;
|
||||
taosArrayPush(hbRsp.pTsmas, &pTsmaInfo);
|
||||
if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
|
||||
code = terrno;
|
||||
tFreeAndClearTableTSMAInfo(pTsmaInfo);
|
||||
goto _OVER;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2482,7 +2653,11 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
if (code) goto _OVER;
|
||||
taosArrayPush(hbRsp.pTsmas, &pTsmaInfo);
|
||||
if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
|
||||
code = terrno;
|
||||
tFreeAndClearTableTSMAInfo(pTsmaInfo);
|
||||
goto _OVER;
|
||||
}
|
||||
continue;
|
||||
} else if (pSma->version == pTsmaVer->version) {
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
|
@ -2495,7 +2670,11 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
code = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
if (code) goto _OVER;
|
||||
taosArrayPush(hbRsp.pTsmas, &pTsmaInfo);
|
||||
if (NULL == taosArrayPush(hbRsp.pTsmas, &pTsmaInfo)) {
|
||||
code = terrno;
|
||||
tFreeAndClearTableTSMAInfo(pTsmaInfo);
|
||||
goto _OVER;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2503,7 +2682,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
STableTSMAInfo * pInfo = NULL;
|
||||
pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
mndReleaseStb(pMnode, pDestStb);
|
||||
goto _OVER;
|
||||
|
@ -2517,27 +2696,35 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
mndReleaseSma(pMnode, pSma);
|
||||
if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
|
||||
if (terrno) {
|
||||
tFreeTableTSMAInfo(pInfo);
|
||||
tFreeAndClearTableTSMAInfo(pInfo);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
taosArrayPush(hbRsp.pTsmas, pInfo);
|
||||
if (NULL == taosArrayPush(hbRsp.pTsmas, pInfo)) {
|
||||
code = terrno;
|
||||
tFreeAndClearTableTSMAInfo(pInfo);
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);
|
||||
if (rspLen < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pRsp = taosMemoryMalloc(rspLen);
|
||||
if (!pRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
rspLen = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
|
||||
rspLen = tSerializeTSMAHbRsp(pRsp, rspLen, &hbRsp);
|
||||
if (rspLen < 0) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
code = 0;
|
||||
_OVER:
|
||||
tFreeTSMAHbRsp(&hbRsp);
|
||||
|
|
|
@ -2672,7 +2672,7 @@ _return:
|
|||
ctgReleaseVgInfoToCache(pCtg, pDbCache);
|
||||
}
|
||||
if (pTsma) {
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
tFreeAndClearTableTSMAInfo(pTsma);
|
||||
pTsma = NULL;
|
||||
}
|
||||
if (pVgHash) {
|
||||
|
|
|
@ -21,18 +21,19 @@
|
|||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define CHECK_MAKE_NODE(p) \
|
||||
do { \
|
||||
if (NULL == (p)) { \
|
||||
return NULL; \
|
||||
} \
|
||||
#define CHECK_MAKE_NODE(p) \
|
||||
do { \
|
||||
if (NULL == (p)) { \
|
||||
return NULL; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_OUT_OF_MEM(p) \
|
||||
do { \
|
||||
if (NULL == (p)) { \
|
||||
return NULL; \
|
||||
} \
|
||||
#define CHECK_OUT_OF_MEM(p) \
|
||||
do { \
|
||||
if (NULL == (p)) { \
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
|
||||
return NULL; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_PARSER_STATUS(pCxt) \
|
||||
|
@ -269,7 +270,7 @@ SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* p
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SRawExprNode* target = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_RAW_EXPR, (SNode**)&target);
|
||||
CHECK_OUT_OF_MEM(target);
|
||||
CHECK_MAKE_NODE(target);
|
||||
target->p = pToken->z;
|
||||
target->n = pToken->n;
|
||||
target->pNode = pNode;
|
||||
|
@ -280,7 +281,7 @@ SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SRawExprNode* target = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_RAW_EXPR, (SNode**)&target);
|
||||
CHECK_OUT_OF_MEM(target);
|
||||
CHECK_MAKE_NODE(target);
|
||||
target->p = pStart->z;
|
||||
target->n = (pEnd->z + pEnd->n) - pStart->z;
|
||||
target->pNode = pNode;
|
||||
|
@ -347,7 +348,7 @@ SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode) {
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SNodeList* list = NULL;
|
||||
pCxt->errCode = nodesMakeList(&list);
|
||||
CHECK_OUT_OF_MEM(list);
|
||||
CHECK_MAKE_NODE(list);
|
||||
pCxt->errCode = nodesListAppend(list, pNode);
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
nodesDestroyList(list);
|
||||
|
@ -369,7 +370,7 @@ SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pC
|
|||
}
|
||||
SColumnNode* col = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&col);
|
||||
CHECK_OUT_OF_MEM(col);
|
||||
CHECK_MAKE_NODE(col);
|
||||
if (NULL != pTableAlias) {
|
||||
COPY_STRING_FORM_ID_TOKEN(col->tableAlias, pTableAlias);
|
||||
}
|
||||
|
@ -381,17 +382,17 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SValueNode* val = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&val);
|
||||
CHECK_OUT_OF_MEM(val);
|
||||
CHECK_MAKE_NODE(val);
|
||||
val->literal = strndup(pLiteral->z, pLiteral->n);
|
||||
if (TK_NK_ID != pLiteral->type && TK_TIMEZONE != pLiteral->type &&
|
||||
(IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) {
|
||||
(void)trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n);
|
||||
}
|
||||
if(!val->literal) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
nodesDestroyNode((SNode*)val);
|
||||
return NULL;
|
||||
}
|
||||
if (TK_NK_ID != pLiteral->type && TK_TIMEZONE != pLiteral->type &&
|
||||
(IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) {
|
||||
(void)trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n);
|
||||
}
|
||||
val->node.resType.type = dataType;
|
||||
val->node.resType.bytes = IS_VAR_DATA_TYPE(dataType) ? strlen(val->literal) : tDataTypes[dataType].bytes;
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP == dataType) {
|
||||
|
@ -522,13 +523,16 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
|||
|
||||
SHintNode* hint = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_HINT, (SNode**)&hint);
|
||||
CHECK_OUT_OF_MEM(hint);
|
||||
CHECK_MAKE_NODE(hint);
|
||||
hint->option = opt;
|
||||
hint->value = value;
|
||||
|
||||
if (NULL == *ppHintList) {
|
||||
pCxt->errCode = nodesMakeList(ppHintList);
|
||||
CHECK_OUT_OF_MEM(*ppHintList);
|
||||
if (!*ppHintList) {
|
||||
nodesDestroyNode((SNode*)hint);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
pCxt->errCode = nodesListStrictAppend(*ppHintList, (SNode*)hint);
|
||||
|
@ -681,7 +685,7 @@ SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral)
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SValueNode* val = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&val);
|
||||
CHECK_OUT_OF_MEM(val);
|
||||
CHECK_MAKE_NODE(val);
|
||||
if (pLiteral->type == TK_NK_STRING) {
|
||||
// like '100s' or "100d"
|
||||
// check format: ^[0-9]+[smwbauhdny]$'
|
||||
|
@ -733,7 +737,7 @@ SNode* createTimeOffsetValueNode(SAstCreateContext* pCxt, const SToken* pLiteral
|
|||
CHECK_PARSER_STATUS(pCxt);
|
||||
SValueNode* val = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&val);
|
||||
CHECK_OUT_OF_MEM(val);
|
||||
CHECK_MAKE_NODE(val);
|
||||
if (pLiteral->type == TK_NK_STRING) {
|
||||
// like '100s' or "100d"
|
||||
// check format: ^[0-9]+[smwbauhdny]$'
|
||||
|
@ -899,7 +903,7 @@ SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pL
|
|||
}
|
||||
SOperatorNode* op = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op);
|
||||
CHECK_OUT_OF_MEM(op);
|
||||
CHECK_MAKE_NODE(op);
|
||||
op->opType = type;
|
||||
op->pLeft = pLeft;
|
||||
op->pRight = pRight;
|
||||
|
@ -1014,12 +1018,16 @@ SNode* createNodeListNodeEx(SAstCreateContext* pCxt, SNode* p1, SNode* p2) {
|
|||
SNodeListNode* list = NULL;
|
||||
pCxt->errCode = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&list);
|
||||
CHECK_MAKE_NODE(list);
|
||||
pCxt->errCode = nodesMakeList(&list->pNodeList);
|
||||
CHECK_OUT_OF_MEM(list->pNodeList);
|
||||
pCxt->errCode = nodesListAppend(list->pNodeList, p1);
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
pCxt->errCode = nodesListAppend(list->pNodeList, p2);
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
pCxt->errCode = nodesListMakeStrictAppend(&list->pNodeList, p1);
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
nodesDestroyNode((SNode*)list);
|
||||
return NULL;
|
||||
}
|
||||
pCxt->errCode = nodesListStrictAppend(list->pNodeList, p2);
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
nodesDestroyNode((SNode*)list);
|
||||
return NULL;
|
||||
}
|
||||
return (SNode*)list;
|
||||
}
|
||||
|
||||
|
@ -1143,7 +1151,7 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
|
|||
state->pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||
if (NULL == state->pCol) {
|
||||
nodesDestroyNode((SNode*)state);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
CHECK_MAKE_NODE(NULL);
|
||||
}
|
||||
state->pExpr = pExpr;
|
||||
return (SNode*)state;
|
||||
|
@ -1157,7 +1165,7 @@ SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode*
|
|||
pEvent->pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||
if (NULL == pEvent->pCol) {
|
||||
nodesDestroyNode((SNode*)pEvent);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
CHECK_MAKE_NODE(NULL);
|
||||
}
|
||||
pEvent->pStartCond = pStartCond;
|
||||
pEvent->pEndCond = pEndCond;
|
||||
|
@ -1172,7 +1180,7 @@ SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken,
|
|||
pCount->pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||
if (NULL == pCount->pCol) {
|
||||
nodesDestroyNode((SNode*)pCount);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
CHECK_MAKE_NODE(NULL);
|
||||
}
|
||||
pCount->windowCount = taosStr2Int64(pCountToken->z, NULL, 10);
|
||||
pCount->windowSliding = taosStr2Int64(pSlidingToken->z, NULL, 10);
|
||||
|
@ -1188,7 +1196,7 @@ SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode
|
|||
interval->pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||
if (NULL == interval->pCol) {
|
||||
nodesDestroyNode((SNode*)interval);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
CHECK_MAKE_NODE(NULL);
|
||||
}
|
||||
interval->pInterval = pInterval;
|
||||
interval->pOffset = pOffset;
|
||||
|
@ -2086,6 +2094,7 @@ SNode* createAlterTableAddModifyColOptions2(SAstCreateContext* pCxt, SNode* pRea
|
|||
} else {
|
||||
pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"not support alter column with option except compress");
|
||||
nodesDestroyNode((SNode*)pStmt);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -3183,9 +3192,15 @@ SNode* createFuncForDelete(SAstCreateContext* pCxt, const char* pFuncName) {
|
|||
pCxt->errCode = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pFunc);
|
||||
CHECK_MAKE_NODE(pFunc);
|
||||
snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pFuncName);
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
|
||||
SNode* pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||
if (!pCol) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
return NULL;
|
||||
}
|
||||
pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pCol);
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
return NULL;
|
||||
}
|
||||
return (SNode*)pFunc;
|
||||
}
|
||||
|
@ -3202,7 +3217,7 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
|
|||
pStmt->pLastFunc = createFuncForDelete(pCxt, "last");
|
||||
if (NULL == pStmt->pCountFunc || NULL == pStmt->pFirstFunc || NULL == pStmt->pLastFunc) {
|
||||
nodesDestroyNode((SNode*)pStmt);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
CHECK_MAKE_NODE(NULL);
|
||||
}
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
|
|
@ -5268,9 +5268,12 @@ static int32_t translateFill(STranslateContext* pCxt, SSelectStmt* pSelect, SInt
|
|||
return checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval, false);
|
||||
}
|
||||
|
||||
static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) {
|
||||
static int32_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit, int64_t* pMonth) {
|
||||
int64_t days = -1;
|
||||
convertTimeFromPrecisionToUnit(val, fromPrecision, 'd', &days);
|
||||
int32_t code = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd', &days);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
switch (unit) {
|
||||
case 'b':
|
||||
case 'u':
|
||||
|
@ -5280,15 +5283,19 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
|
|||
case 'h':
|
||||
case 'd':
|
||||
case 'w':
|
||||
return days / 28;
|
||||
*pMonth = days / 28;
|
||||
return code;
|
||||
case 'n':
|
||||
return val;
|
||||
*pMonth = val;
|
||||
return code;
|
||||
case 'y':
|
||||
return val * 12;
|
||||
*pMonth = val * 12;
|
||||
return code;
|
||||
default:
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* getPrecisionStr(uint8_t precision) {
|
||||
|
@ -5355,11 +5362,23 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_UNIT);
|
||||
}
|
||||
bool fixed = !IS_CALENDAR_TIME_DURATION(pOffset->unit) && !valInter;
|
||||
if ((fixed && pOffset->datum.i >= pInter->datum.i) ||
|
||||
(!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >=
|
||||
getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
|
||||
if (fixed && pOffset->datum.i >= pInter->datum.i) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG);
|
||||
}
|
||||
if (!fixed) {
|
||||
int64_t offsetMonth = 0, intervalMonth = 0;
|
||||
int32_t code = getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit, &offsetMonth);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
code = getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit, &intervalMonth);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
if (offsetMonth > intervalMonth) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG);
|
||||
}
|
||||
}
|
||||
|
||||
if (pOffset->unit == 'n' || pOffset->unit == 'y') {
|
||||
convertVarDuration(pOffset, precision);
|
||||
|
@ -8295,7 +8314,11 @@ static int32_t makeIntervalVal(SRetention* pRetension, int8_t precision, SNode**
|
|||
return code;
|
||||
}
|
||||
int64_t timeVal = -1;
|
||||
convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit, &timeVal);
|
||||
code = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit, &timeVal);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode((SNode*)pVal);
|
||||
return code;
|
||||
}
|
||||
char buf[20] = {0};
|
||||
int32_t len = snprintf(buf, sizeof(buf), "%" PRId64 "%c", timeVal, pRetension->freqUnit);
|
||||
pVal->literal = strndup(buf, len);
|
||||
|
|
Loading…
Reference in New Issue