diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3b375f7f82..bd6db483f0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -106,10 +106,14 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); void* p = taosArrayPush(deleteReq->deleteReqs, &req); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } - if (originName) name = originName; + + if (originName) { + name = originName; + } + taosMemoryFreeClear(name); } @@ -190,6 +194,7 @@ int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName); + code = TSDB_CODE_OUT_OF_MEMORY; } } @@ -202,14 +207,14 @@ int32_t createDefaultTagColName(SArray** pColNameList) { SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN); if (pTagColNameList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; void* p = taosArrayPush(pTagColNameList, tagNameStr); if (p == NULL) { taosArrayDestroy(pTagColNameList); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *pColNameList = pTagColNameList; @@ -252,15 +257,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; + int32_t code = 0; tqDebug("s-task:%s build create %d table(s) msg", id, rows); - - int32_t code = 0; - SVCreateTbBatchReq reqs = {0}; SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); - if (NULL == reqs.pArray) { + if ((NULL == reqs.pArray) || (tagArray == NULL)) { tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno)); + code = terrno; goto _end; } @@ -418,8 +422,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c int32_t j = 0, k = 0; SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); if (pFinal == NULL) { - tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno)); + return terrno; } while (j < newLen && k < oldLen) { @@ -872,6 +876,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); + if (pTagArray == NULL) { + return terrno; + } pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; code = @@ -1167,6 +1174,9 @@ int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + if (deleteReq.deleteReqs == NULL) { + return terrno; + } int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);