refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-08-02 13:46:24 +08:00
parent 099064af5e
commit 9feacd983f
1 changed files with 20 additions and 10 deletions

View File

@ -106,10 +106,14 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
void* p = taosArrayPush(deleteReq->deleteReqs, &req); void* p = taosArrayPush(deleteReq->deleteReqs, &req);
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
} }
if (originName) name = originName;
if (originName) {
name = originName;
}
taosMemoryFreeClear(name); taosMemoryFreeClear(name);
} }
@ -190,6 +194,7 @@ int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const
pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code
tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName); tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
code = TSDB_CODE_OUT_OF_MEMORY;
} }
} }
@ -202,14 +207,14 @@ int32_t createDefaultTagColName(SArray** pColNameList) {
SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN); SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
if (pTagColNameList == NULL) { if (pTagColNameList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id"; char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
void* p = taosArrayPush(pTagColNameList, tagNameStr); void* p = taosArrayPush(pTagColNameList, tagNameStr);
if (p == NULL) { if (p == NULL) {
taosArrayDestroy(pTagColNameList); taosArrayDestroy(pTagColNameList);
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
*pColNameList = pTagColNameList; *pColNameList = pTagColNameList;
@ -252,15 +257,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;
tqDebug("s-task:%s build create %d table(s) msg", id, rows); tqDebug("s-task:%s build create %d table(s) msg", id, rows);
int32_t code = 0;
SVCreateTbBatchReq reqs = {0}; SVCreateTbBatchReq reqs = {0};
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
if (NULL == reqs.pArray) { if ((NULL == reqs.pArray) || (tagArray == NULL)) {
tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno)); tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
code = terrno;
goto _end; goto _end;
} }
@ -418,8 +422,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
int32_t j = 0, k = 0; int32_t j = 0, k = 0;
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES); SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
if (pFinal == NULL) { if (pFinal == NULL) {
tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
while (j < newLen && k < oldLen) { while (j < newLen && k < oldLen) {
@ -872,6 +876,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
if (pTagArray == NULL) {
return terrno;
}
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
code = code =
@ -1167,6 +1174,9 @@ int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) { int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
if (deleteReq.deleteReqs == NULL) {
return terrno;
}
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);