Merge pull request #26931 from taosdata/fix/syntax
fix(stream): check return value.
This commit is contained in:
commit
6650c6b325
|
@ -290,7 +290,7 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap
|
|||
pWrapper->nCols = taosArrayGetSize(pFields);
|
||||
pWrapper->pSchema = taosMemoryCalloc(pWrapper->nCols, sizeof(SSchema));
|
||||
if (NULL == pWrapper->pSchema) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SNode *pNode;
|
||||
|
@ -328,15 +328,18 @@ static bool hasDestPrimaryKey(SSchemaWrapper *pWrapper) {
|
|||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||
SNode *pAst = NULL;
|
||||
SQueryPlan *pPlan = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
mInfo("stream:%s to create", pCreate->name);
|
||||
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
pObj->createTime = taosGetTimestampMs();
|
||||
pObj->updateTime = pObj->createTime;
|
||||
pObj->version = 1;
|
||||
|
||||
if (pCreate->smaId > 0) {
|
||||
pObj->subTableWithoutMd5 = 1;
|
||||
}
|
||||
|
||||
pObj->smaId = pCreate->smaId;
|
||||
pObj->indexForMultiAggBalance = -1;
|
||||
|
||||
|
@ -360,8 +363,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||
if (pSourceDb == NULL) {
|
||||
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
|
||||
return terrno;
|
||||
code = terrno;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
pObj->sourceDbUid = pSourceDb->uid;
|
||||
mndReleaseDb(pMnode, pSourceDb);
|
||||
|
||||
|
@ -369,9 +374,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
||||
if (pTargetDb == NULL) {
|
||||
mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
|
||||
return terrno;
|
||||
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
|
||||
code = terrno;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
||||
if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
|
||||
|
@ -389,12 +396,12 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pCreate->ast = NULL;
|
||||
|
||||
// deserialize ast
|
||||
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||
if ((code = nodesStringToNode(pObj->ast, &pAst)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// create output schema
|
||||
if (createSchemaByFields(pCreate->pCols, &pObj->outputSchema) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = createSchemaByFields(pCreate->pCols, &pObj->outputSchema)) != TSDB_CODE_SUCCESS) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -403,6 +410,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pObj->outputSchema.nCols += numOfNULL;
|
||||
SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
|
||||
if (!pFullSchema) {
|
||||
code = terrno;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -410,6 +418,10 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
int32_t dataIndex = 0;
|
||||
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||
if (pos == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nullIndex >= numOfNULL || i < pos->slotId) {
|
||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||
|
@ -444,22 +456,31 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
};
|
||||
|
||||
// using ast and param to build physical plan
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// save physcial plan
|
||||
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
||||
if ((code = nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL)) != 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
pObj->tagSchema.nCols = pCreate->numOfTags;
|
||||
if (pCreate->numOfTags) {
|
||||
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
|
||||
if (pObj->tagSchema.pSchema == NULL) {
|
||||
code = terrno;
|
||||
goto FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
|
||||
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
|
||||
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||
if (pField == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
|
||||
pObj->tagSchema.pSchema[i].bytes = pField->bytes;
|
||||
pObj->tagSchema.pSchema[i].flags = pField->flags;
|
||||
|
@ -470,7 +491,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
FAIL:
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
if (pPlan != NULL) qDestroyQueryPlan(pPlan);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
||||
|
@ -575,12 +596,15 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
|
|||
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SMCreateStbReq createReq = {0};
|
||||
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
createReq.numOfColumns = pStream->outputSchema.nCols;
|
||||
createReq.numOfTags = 1; // group id
|
||||
createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns);
|
||||
TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno);
|
||||
|
||||
// build fields
|
||||
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
|
||||
|
@ -595,6 +619,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
if (pStream->tagSchema.nCols == 0) {
|
||||
createReq.numOfTags = 1;
|
||||
createReq.pTags = taosArrayInit_s(sizeof(SField), 1);
|
||||
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
|
||||
|
||||
// build tags
|
||||
SField *pField = taosArrayGet(createReq.pTags, 0);
|
||||
strcpy(pField->name, "group_id");
|
||||
|
@ -604,6 +630,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
} else {
|
||||
createReq.numOfTags = pStream->tagSchema.nCols;
|
||||
createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags);
|
||||
TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno);
|
||||
|
||||
for (int32_t i = 0; i < createReq.numOfTags; i++) {
|
||||
SField *pField = taosArrayGet(createReq.pTags, i);
|
||||
pField->bytes = pStream->tagSchema.pSchema[i].bytes;
|
||||
|
@ -657,7 +685,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
|
||||
return 0;
|
||||
return code;
|
||||
|
||||
_OVER:
|
||||
tFreeSMCreateStbReq(&createReq);
|
||||
|
@ -665,7 +693,7 @@ _OVER:
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// 1. stream number check
|
||||
|
@ -709,9 +737,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
char *sql = NULL;
|
||||
int32_t sqlLen = 0;
|
||||
const char *pMsg = "create stream tasks on dnodes";
|
||||
int32_t code = 0;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
SCMCreateStreamReq createReq = {0};
|
||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -749,6 +777,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
if (createReq.sql != NULL) {
|
||||
sqlLen = strlen(createReq.sql);
|
||||
sql = taosMemoryMalloc(sqlLen + 1);
|
||||
if (sql == NULL) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
memset(sql, 0, sqlLen + 1);
|
||||
memcpy(sql, createReq.sql, sqlLen);
|
||||
}
|
||||
|
@ -942,8 +975,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
|
|||
|
||||
void *buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
@ -1150,7 +1182,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
|||
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
|
@ -1159,8 +1195,12 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
|||
if (pEntry->status == TASK_STATUS__STOP) {
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
|
||||
STaskId *pId = taosArrayGet(pInvalidList, j);
|
||||
if (pId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->id.streamId == pId->streamId) {
|
||||
void* px = taosArrayPush(pInvalidList, &pEntry->id);
|
||||
void *px = taosArrayPush(pInvalidList, &pEntry->id);
|
||||
if (px == NULL) {
|
||||
mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
}
|
||||
|
@ -1243,6 +1283,10 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
|
||||
if (pList == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t now = taosGetTimestampMs();
|
||||
|
||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||
|
@ -2472,14 +2516,15 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
|||
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
|
||||
if (pReqTaskList == NULL) {
|
||||
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
|
||||
doAddReportStreamTask(pList, &req);
|
||||
if (pList != NULL) {
|
||||
doAddReportStreamTask(pList, &req);
|
||||
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
|
||||
if (code) {
|
||||
mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
|
||||
}
|
||||
|
||||
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
|
||||
if (code) {
|
||||
mError("stream:0x%"PRIx64 " failed to put into checkpoint stream", req.streamId);
|
||||
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
|
||||
}
|
||||
|
||||
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
|
||||
} else {
|
||||
doAddReportStreamTask(*pReqTaskList, &req);
|
||||
}
|
||||
|
@ -2545,6 +2590,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->info.node;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SArray *pStreamList = taosArrayInit(4, sizeof(int64_t));
|
||||
if (pStreamList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("start to process consensus-checkpointId in tmr");
|
||||
|
||||
|
@ -2572,6 +2620,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
|||
int64_t streamId = -1;
|
||||
int32_t num = taosArrayGetSize(pInfo->pTaskList);
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pList == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SStreamObj *pStream = NULL;
|
||||
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
|
||||
|
|
|
@ -365,6 +365,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
|||
ps->onlyRef = true;
|
||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||
if (code) {
|
||||
taosMemoryFree(ps);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue