Merge branch '3.0' into enh/TD-30988-3.0

This commit is contained in:
kailixu 2024-07-29 14:38:32 +08:00
commit 57d16e2c3e
17 changed files with 289 additions and 285 deletions

View File

@ -224,7 +224,7 @@ int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeOb
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -252,7 +252,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -330,7 +330,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
sprintf(obj, "%d", createReq.dnodeId); (void)sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER: _OVER:
@ -383,7 +383,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -459,7 +459,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
sprintf(obj, "%d", dropReq.dnodeId); (void)sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen);
@ -531,7 +531,7 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); (void)tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp);
pReq->info.rspLen = rspLen; pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp; pReq->info.rsp = pRsp;
@ -556,15 +556,15 @@ static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
cols = 0; cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0}; char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)ep, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)ep, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);

View File

@ -27,7 +27,7 @@ int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
void mndPostProcessQueryMsg(SRpcMsg *pMsg) { void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return; if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); (void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
@ -134,7 +134,10 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
rsp.msgLen = reqMsg.info.rspLen; rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp; rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp.pRsps, &rsp); if (taosArrayPush(batchRsp.pRsps, &rsp) == NULL) {
mError("msg:%p, failed to put array since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
} }
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);

View File

@ -115,7 +115,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
} else { } else {
pInfo->type = TASK_OUTPUT__TABLE; pInfo->type = TASK_OUTPUT__TABLE;
pInfo->tbSink.stbUid = pStream->targetStbUid; pInfo->tbSink.stbUid = pStream->targetStbUid;
memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pInfo->tbSink.pSchemaWrapper == NULL) { if (pInfo->tbSink.pSchemaWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -145,7 +145,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) { if (isShuffle) {
memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs); int32_t numOfVgroups = taosArrayGetSize(pVgs);
@ -363,10 +363,14 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
static void addNewTaskList(SStreamObj* pStream) { static void addNewTaskList(SStreamObj* pStream) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList); if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
mError("failed to put array");
}
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
pTaskList = taosArrayInit(0, POINTER_BYTES); pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->pHTasksList, &pTaskList); if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
mError("failed to put array");
}
} }
} }
@ -584,10 +588,15 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset)
} }
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) { static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); int32_t code = 0;
if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
streamTaskSetUpstreamInfo(pSinkTask, task); if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr); mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
} }
@ -604,6 +613,7 @@ static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
} }
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
int32_t code = 0;
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL); SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
@ -614,12 +624,15 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
if (hasExtraSink) { if (hasExtraSink) {
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
} else { } else {
mndSetSinkTaskInfo(pStream, pSourceTask); if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
} }
} }
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
int32_t code = 0;
size_t size = taosArrayGetSize(tasks); size_t size = taosArrayGetSize(tasks);
ASSERT(size >= 2); ASSERT(size >= 2);
SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
@ -631,7 +644,9 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
pUpTask->info.selfChildId = i - begin; pUpTask->info.selfChildId = i - begin;
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask); if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr); mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
} }

View File

@ -158,7 +158,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.id = showId; showObj.id = showId;
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); (void)memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN); tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
@ -270,9 +270,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type); mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
if (retrieveReq.user[0] != 0) { if (retrieveReq.user[0] != 0) {
memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN); (void)memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
} else { } else {
memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); (void)memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
} }
code = -1; code = -1;
if (retrieveReq.db[0] && if (retrieveReq.db[0] &&
@ -303,10 +303,10 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
idata.info.bytes = p->bytes; idata.info.bytes = p->bytes;
idata.info.type = p->type; idata.info.type = p->type;
idata.info.colId = p->colId; idata.info.colId = p->colId;
blockDataAppendColInfo(pBlock, &idata); TAOS_CHECK_RETURN(blockDataAppendColInfo(pBlock, &idata));
} }
blockDataEnsureCapacity(pBlock, rowsToRead); TAOS_CHECK_RETURN(blockDataEnsureCapacity(pBlock, rowsToRead));
if (mndCheckRetrieveFinished(pShow)) { if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows); mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);

View File

@ -212,8 +212,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
mndTransRefresh(pMnode, pTrans); mndTransRefresh(pMnode, pTrans);
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
code = 0;
_OUT: _OUT:
if (pTrans) mndReleaseTrans(pMnode, pTrans); if (pTrans) mndReleaseTrans(pMnode, pTrans);
@ -222,7 +221,7 @@ _OUT:
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId == 0) { if (pMgmt->transId == 0) {
goto _OUT; goto _OUT;
} }
@ -232,7 +231,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
pMgmt->errCode = code; pMgmt->errCode = code;
tsem_post(&pMgmt->syncSem); (void)tsem_post(&pMgmt->syncSem);
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
@ -241,7 +240,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
} }
_OUT: _OUT:
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
return 0; return 0;
} }
@ -304,7 +303,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
} else { } else {
mInfo("vgId:1, sync restore finished"); mInfo("vgId:1, sync restore finished");
} }
mndRefreshUserIpWhiteList(pMnode); (void)mndRefreshUserIpWhiteList(pMnode);
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
} }
@ -350,16 +349,16 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
mInfo("vgId:1, become follower"); mInfo("vgId:1, become follower");
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem); (void)tsem_post(&pMgmt->syncSem);
} }
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
} }
static void mndBecomeLearner(const SSyncFSM *pFsm) { static void mndBecomeLearner(const SSyncFSM *pFsm) {
@ -367,16 +366,16 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
mInfo("vgId:1, become learner"); mInfo("vgId:1, become learner");
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem); (void)tsem_post(&pMgmt->syncSem);
} }
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
} }
static void mndBecomeLeader(const SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) {
@ -435,12 +434,12 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexInit(&pMgmt->lock, NULL); (void)taosThreadMutexInit(&pMgmt->lock, NULL);
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
@ -477,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) {
} }
int32_t code = 0; int32_t code = 0;
tsem_init(&pMgmt->syncSem, 0, 0); (void)tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->sync = syncOpen(&syncInfo, true); pMgmt->sync = syncOpen(&syncInfo, true);
if (pMgmt->sync <= 0) { if (pMgmt->sync <= 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
@ -495,15 +494,15 @@ void mndCleanupSync(SMnode *pMnode) {
syncStop(pMgmt->sync); syncStop(pMgmt->sync);
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
tsem_destroy(&pMgmt->syncSem); (void)tsem_destroy(&pMgmt->syncSem);
taosThreadMutexDestroy(&pMgmt->lock); (void)taosThreadMutexDestroy(&pMgmt->lock);
memset(pMgmt, 0, sizeof(SSyncMgmt)); memset(pMgmt, 0, sizeof(SSyncMgmt));
} }
void mndSyncCheckTimeout(SMnode *pMnode) { void mndSyncCheckTimeout(SMnode *pMnode) {
mTrace("check sync timeout"); mTrace("check sync timeout");
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec(); int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec; int32_t delta = curSec - pMgmt->transSec;
@ -515,7 +514,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
terrno = TSDB_CODE_SYN_TIMEOUT; terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem); (void)tsem_post(&pMgmt->syncSem);
} else { } else {
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
@ -523,7 +522,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
} else { } else {
// mTrace("check sync timeout msg, no trans waiting for confirm"); // mTrace("check sync timeout msg, no trans waiting for confirm");
} }
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
} }
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
@ -536,12 +535,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY;
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = 0; pMgmt->errCode = 0;
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
rpcFreeCont(req.pCont); rpcFreeCont(req.pCont);
TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED); TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
} }
@ -555,23 +554,24 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq); mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
pMgmt->transSeq = seq; pMgmt->transSeq = seq;
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
tsem_wait(&pMgmt->syncSem); (void)tsem_wait(&pMgmt->syncSem);
} else if (code > 0) { } else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw); code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); if (code == 0) {
code = 0; sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
}
} else { } else {
mError("trans:%d, failed to proposed since %s", transId, terrstr()); mError("trans:%d, failed to proposed since %s", transId, terrstr());
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) { if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
} }
@ -600,15 +600,15 @@ void mndSyncStart(SMnode *pMnode) {
void mndSyncStop(SMnode *pMnode) { void mndSyncStop(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); (void)taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING; pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
tsem_post(&pMgmt->syncSem); (void)tsem_post(&pMgmt->syncSem);
} }
taosThreadMutexUnlock(&pMgmt->lock); (void)taosThreadMutexUnlock(&pMgmt->lock);
} }
bool mndIsLeader(SMnode *pMnode) { bool mndIsLeader(SMnode *pMnode) {

View File

@ -378,7 +378,7 @@ struct STsdb {
struct { struct {
SVHashTable *ht; SVHashTable *ht;
SArray *arr; SArray *arr;
} *commitInfo; } * commitInfo;
}; };
struct TSDBKEY { struct TSDBKEY {
@ -937,7 +937,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
@ -945,7 +945,6 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle); int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle);
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage); int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -39,13 +39,15 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
// alloc // alloc
pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
if (pReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair)); pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair));
if (pReader->tdbTbList == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err);
}
STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK}; STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
taosArrayPush(pReader->tdbTbList, &pair1); taosArrayPush(pReader->tdbTbList, &pair1);
@ -60,16 +62,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
if (code) { if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode), tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosMemoryFree(pReader); TAOS_CHECK_GOTO(code, NULL, _err);
goto _err;
} }
code = tdbTbcMoveToFirst(pReader->pCur); code = tdbTbcMoveToFirst(pReader->pCur);
if (code) { if (code) {
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode), tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosMemoryFree(pReader); TAOS_CHECK_GOTO(code, NULL, _err);
goto _err;
} }
tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
@ -79,11 +79,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
_err: _err:
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
streamTaskSnapReaderClose(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
if (pReader == NULL) return 0;
int32_t code = 0; int32_t code = 0;
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode)); tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode));
taosArrayDestroy(pReader->tdbTbList); taosArrayDestroy(pReader->tdbTbList);
@ -116,6 +119,10 @@ NextTbl:
break; break;
} else { } else {
pVal = taosMemoryCalloc(1, tLen); pVal = taosMemoryCalloc(1, tLen);
if (pVal == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(pVal, tVal, tLen); memcpy(pVal, tVal, tLen);
vLen = tLen; vLen = tLen;
} }
@ -174,8 +181,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
// alloc // alloc
pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
} }
pWriter->pTq = pTq; pWriter->pTq = pTq;
pWriter->sver = sver; pWriter->sver = sver;
@ -184,12 +190,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
*ppWriter = pWriter; *ppWriter = pWriter;
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
return code; return code;
_err:
tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
return 0;
} }
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
@ -207,8 +207,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) {
code = -1;
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
} }
@ -241,10 +240,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
int64_t key[2] = {taskId.streamId, taskId.taskId}; int64_t key[2] = {taskId.streamId, taskId.taskId};
streamMetaWLock(pTq->pStreamMeta); streamMetaWLock(pTq->pStreamMeta);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if ((code =
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) {
streamMetaWUnLock(pTq->pStreamMeta); streamMetaWUnLock(pTq->pStreamMeta);
return -1; return code;
} }
streamMetaWUnLock(pTq->pStreamMeta); streamMetaWUnLock(pTq->pStreamMeta);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {

View File

@ -117,7 +117,7 @@ typedef struct {
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); (void)vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
int32_t offset = strlen(path); int32_t offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
@ -722,20 +722,14 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
{ {
SLastCol *pLastCol = NULL; SLastCol *pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol); (void)tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
}
if (NULL != pLastCol) { if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[0], klen); rocksdb_writebatch_delete(wb, keys_list[0], klen);
} }
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
pLastCol = NULL; pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol); (void)tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
}
if (NULL != pLastCol) { if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[1], klen); rocksdb_writebatch_delete(wb, keys_list[1], klen);
} }
@ -748,7 +742,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
if (h) { if (h) {
erase = true; erase = true;
taosLRUCacheRelease(pTsdb->lruCache, h, erase); (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase);
} }
if (erase) { if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen);
@ -758,7 +752,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen); h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
if (h) { if (h) {
erase = true; erase = true;
taosLRUCacheRelease(pTsdb->lruCache, h, erase); (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase);
} }
if (erase) { if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
@ -1066,12 +1060,12 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
} }
taosLRUCacheRelease(pCache, h, false); (void)taosLRUCacheRelease(pCache, h, false);
} else { } else {
if (!remainCols) { if (!remainCols) {
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
} }
taosArrayPush(remainCols, &(SIdxKey){i, *key}); (void)taosArrayPush(remainCols, &(SIdxKey){i, *key});
} }
} }
@ -1123,10 +1117,12 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
SColVal *pColVal = &updCtx->colVal; SColVal *pColVal = &updCtx->colVal;
SLastCol *pLastCol = NULL; SLastCol *pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
/*
if (code) { if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
} }
*/
SLastCol *PToFree = pLastCol; SLastCol *PToFree = pLastCol;
if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) {
@ -1242,18 +1238,18 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
tsdbRowGetKey(&lRow, &tsdbRowKey); tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0}; STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema); (void)tsdbRowIterOpen(&iter, &lRow, pTSchema);
int32_t iCol = 0; int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
if (!COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(pColVal)) {
tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); (void)tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0);
continue; continue;
} }
updateCtx.lflag = LFLAG_LAST; updateCtx.lflag = LFLAG_LAST;
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
} }
tsdbRowClose(&iter); tsdbRowClose(&iter);
@ -1277,14 +1273,14 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
if (COL_VAL_IS_VALUE(&colVal)) { if (COL_VAL_IS_VALUE(&colVal)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); (void)tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
} }
} }
} }
// 3. do update // 3. do update
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); (void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
_exit: _exit:
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
@ -1317,7 +1313,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
.tsdbRowKey = tsdbRowKey, .tsdbRowKey = tsdbRowKey,
.colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
.val = lRow.pBlockData->aTSKEY[lRow.iRow]}))}; .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
} }
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
@ -1338,7 +1334,7 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
tColDataGetValue(pColData, tRow.iRow, &colVal); tColDataGetValue(pColData, tRow.iRow, &colVal);
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
break; break;
} }
} }
@ -1346,15 +1342,15 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
// 2. prepare last row // 2. prepare last row
STSDBRowIter iter = {0}; STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema); (void)tsdbRowIterOpen(&iter, &lRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
taosArrayPush(ctxArray, &updateCtx); (void)taosArrayPush(ctxArray, &updateCtx);
} }
tsdbRowClose(&iter); tsdbRowClose(&iter);
// 3. do update // 3. do update
tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); (void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
_exit: _exit:
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
@ -1379,7 +1375,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); (void)taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key});
} }
int num_keys = TARRAY_SIZE(remainCols); int num_keys = TARRAY_SIZE(remainCols);
@ -1416,7 +1412,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
if (NULL == lastTmpIndexArray) { if (NULL == lastTmpIndexArray) {
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
} }
taosArrayPush(lastTmpIndexArray, &(i)); (void)taosArrayPush(lastTmpIndexArray, &(i));
lastColIds[lastIndex] = idxKey->key.cid; lastColIds[lastIndex] = idxKey->key.cid;
lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx]; lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
lastIndex++; lastIndex++;
@ -1424,7 +1420,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
if (NULL == lastrowTmpIndexArray) { if (NULL == lastrowTmpIndexArray) {
lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
} }
taosArrayPush(lastrowTmpIndexArray, &(i)); (void)taosArrayPush(lastrowTmpIndexArray, &(i));
lastrowColIds[lastrowIndex] = idxKey->key.cid; lastrowColIds[lastrowIndex] = idxKey->key.cid;
lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx]; lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
lastrowIndex++; lastrowIndex++;
@ -1434,17 +1430,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol)); pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
if (lastTmpIndexArray != NULL) { if (lastTmpIndexArray != NULL) {
mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); (void)mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds);
for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) { for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i)); (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
taosArrayGet(lastTmpColArray, i));
} }
} }
if (lastrowTmpIndexArray != NULL) { if (lastrowTmpIndexArray != NULL) {
mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); (void)mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i), (void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
taosArrayGet(lastrowTmpColArray, i)); taosArrayGet(lastrowTmpColArray, i));
} }
} }
@ -1586,10 +1583,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
SLastCol *pLastCol = NULL; SLastCol *pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
}
SLastCol *PToFree = pLastCol; SLastCol *PToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol) { if (pLastCol) {
@ -1682,19 +1676,19 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j])); TAOS_CHECK_RETURN(reallocVarDataVal(&lastCol.rowKey.pks[j]));
} }
TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal)); TAOS_CHECK_RETURN(reallocVarData(&lastCol.colVal));
taosArrayPush(pLastArray, &lastCol); (void)taosArrayPush(pLastArray, &lastCol);
taosLRUCacheRelease(pCache, h, false); (void)taosLRUCacheRelease(pCache, h, false);
} else { } else {
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol); (void)taosArrayPush(pLastArray, &noneCol);
if (!remainCols) { if (!remainCols) {
remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); remainCols = taosArrayInit(num_keys, sizeof(SIdxKey));
} }
taosArrayPush(remainCols, &(SIdxKey){i, key}); (void)taosArrayPush(remainCols, &(SIdxKey){i, key});
} }
} }
@ -1721,7 +1715,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
} }
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosLRUCacheRelease(pCache, h, false); (void)taosLRUCacheRelease(pCache, h, false);
taosArrayRemove(remainCols, i); taosArrayRemove(remainCols, i);
} else { } else {
@ -1810,10 +1804,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL; SLastCol *pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol); (void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
}
(void)taosThreadMutexLock(&pTsdb->rCache.rMutex); (void)taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
@ -1821,10 +1812,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
pLastCol = NULL; pLastCol = NULL;
code = tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol); (void)tsdbCacheDeserialize(values_list[i + num_keys], values_list_sizes[i + num_keys], &pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
}
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
} }
@ -1846,7 +1834,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase);
} }
if (erase) { if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
@ -1862,7 +1850,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) { if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); (void)taosLRUCacheRelease(pTsdb->lruCache, h, erase);
} }
if (erase) { if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
@ -1982,7 +1970,7 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
SDelData *pDelData = pTbData ? pTbData->pHead : NULL; SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
for (; pDelData; pDelData = pDelData->pNext) { for (; pDelData; pDelData = pDelData->pNext) {
taosArrayPush(aDelData, pDelData); (void)taosArrayPush(aDelData, pDelData);
} }
return code; return code;
@ -2005,7 +1993,7 @@ static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid)
if (!ppInfo) { if (!ppInfo) {
pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo));
if (pInfo) { if (pInfo) {
tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); (void)tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
} }
return pInfo; return pInfo;
@ -2117,11 +2105,11 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/ TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData); (void)taosArrayPush(pInfo->pTombData, &delData);
} }
} }
tTombBlockDestroy(&block); (void)tTombBlockDestroy(&block);
if (finished) { if (finished) {
TAOS_RETURN(code); TAOS_RETURN(code);
@ -2312,7 +2300,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader); (void)loadDataTomb(state->pr, state->pr->pFileReader);
TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err); TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
} }
@ -2329,7 +2317,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
SBrinBlk *pBrinBlk = &pBlkArray->data[i]; SBrinBlk *pBrinBlk = &pBlkArray->data[i];
if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) { if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) {
if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) { if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) {
taosArrayPush(state->pIndexList, pBrinBlk); (void)taosArrayPush(state->pIndexList, pBrinBlk);
} }
} else if (state->suid > pBrinBlk->maxTbid.suid || } else if (state->suid > pBrinBlk->maxTbid.suid ||
(state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) { (state->suid == pBrinBlk->maxTbid.suid && state->uid > pBrinBlk->maxTbid.uid)) {
@ -2385,7 +2373,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (!state->pLastRow) { if (!state->pLastRow) {
if (state->pLastIter) { if (state->pLastIter) {
lastIterClose(&state->pLastIter); (void)lastIterClose(&state->pLastIter);
} }
clearLastFileSet(state); clearLastFileSet(state);
@ -2493,7 +2481,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (!state->pLastRow) { if (!state->pLastRow) {
if (state->pLastIter) { if (state->pLastIter) {
lastIterClose(&state->pLastIter); (void)lastIterClose(&state->pLastIter);
} }
*ppRow = &state->row; *ppRow = &state->row;
@ -2517,7 +2505,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
} else { } else {
// TODO: merge rows and *ppRow = mergedRow // TODO: merge rows and *ppRow = mergedRow
SRowMerger *pMerger = &state->rowMerger; SRowMerger *pMerger = &state->rowMerger;
tsdbRowMergerInit(pMerger, state->pTSchema); (void)tsdbRowMergerInit(pMerger, state->pTSchema);
TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err); TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err); TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
@ -2682,7 +2670,7 @@ int32_t clearNextRowFromFS(void *iter) {
} }
if (state->pLastIter) { if (state->pLastIter) {
lastIterClose(&state->pLastIter); (void)lastIterClose(&state->pLastIter);
} }
if (state->pBlockData) { if (state->pBlockData) {
@ -2715,7 +2703,7 @@ int32_t clearNextRowFromFS(void *iter) {
static void clearLastFileSet(SFSNextRowIter *state) { static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pLastIter) { if (state->pLastIter) {
lastIterClose(&state->pLastIter); (void)lastIterClose(&state->pLastIter);
} }
if (state->pBlockData) { if (state->pBlockData) {
@ -2724,7 +2712,7 @@ static void clearLastFileSet(SFSNextRowIter *state) {
} }
if (state->pr->pFileReader) { if (state->pr->pFileReader) {
tsdbDataFileReaderClose(&state->pr->pFileReader); (void)tsdbDataFileReaderClose(&state->pr->pFileReader);
state->pr->pFileReader = NULL; state->pr->pFileReader = NULL;
state->pr->pCurFileSet = NULL; state->pr->pCurFileSet = NULL;
@ -2814,7 +2802,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
if (pIter->input[i].nextRowClearFn) { if (pIter->input[i].nextRowClearFn) {
pIter->input[i].nextRowClearFn(pIter->input[i].iter); (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
} }
} }
@ -2898,7 +2886,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
} }
taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData); (void)taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData);
size_t delSize = TARRAY_SIZE(pInfo->pTombData); size_t delSize = TARRAY_SIZE(pInfo->pTombData);
if (delSize > 0) { if (delSize > 0) {
@ -2944,7 +2932,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
int16_t slotId = slotIds[i]; int16_t slotId = slotIds[i];
SLastCol col = {.rowKey.ts = 0, SLastCol col = {.rowKey.ts = 0,
.colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col); (void)taosArrayPush(pColArray, &col);
} }
*ppColArray = pColArray; *ppColArray = pColArray;
@ -2998,18 +2986,18 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
} }
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
taosArrayPush(aColArray, &aCols[i]); (void)taosArrayPush(aColArray, &aCols[i]);
} }
STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX}; STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
// inverse iterator // inverse iterator
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
if (!pRow) { if (!pRow) {
break; break;
@ -3142,7 +3130,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
} }
*ppLastArray = pColArray; *ppLastArray = pColArray;
nextRowIterClose(&iter); (void)nextRowIterClose(&iter);
taosArrayDestroy(aColArray); taosArrayDestroy(aColArray);
TAOS_RETURN(code); TAOS_RETURN(code);
@ -3184,16 +3172,16 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
} }
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
taosArrayPush(aColArray, &aCols[i]); (void)taosArrayPush(aColArray, &aCols[i]);
} }
// inverse iterator // inverse iterator
CacheNextRowIter iter = {0}; CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr); (void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); (void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
if (!pRow) { if (!pRow) {
break; break;
@ -3266,7 +3254,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
} }
*ppLastArray = pColArray; *ppLastArray = pColArray;
nextRowIterClose(&iter); (void)nextRowIterClose(&iter);
taosArrayDestroy(aColArray); taosArrayDestroy(aColArray);
TAOS_RETURN(code); TAOS_RETURN(code);
@ -3286,13 +3274,7 @@ _err:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { taosLRUCacheRelease(pCache, h, false); }
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) { void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity); taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);

View File

@ -143,9 +143,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb);
void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend); void taskDbDestroy2(void* pBackend);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
@ -252,7 +252,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
SBkdMgt* bkdMgtCreate(char* path); int32_t bkdMgtCreate(char* path, SBkdMgt **bm);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);

View File

@ -2525,35 +2525,35 @@ _EXIT:
return NULL; return NULL;
} }
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
char* statePath = NULL; char* statePath = NULL;
char* dbPath = NULL; char* dbPath = NULL;
int code = 0; int code = 0;
terrno = 0;
if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
terrno = code;
stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
chkptId, tstrerror(terrno)); chkptId, tstrerror(code));
return NULL; return code;
} }
STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
if (pTaskDb != NULL) { if (pTaskDb != NULL) {
int64_t chkpId = -1, ver = -1; int64_t chkpId = -1, ver = -1;
if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
*processVer = ver; *processVer = ver;
} else { } else {
terrno = code;
stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
tstrerror(terrno)); tstrerror(code));
taskDbDestroy(pTaskDb, false); taskDbDestroy(pTaskDb, false);
return NULL; return code;
} }
} else {
code = TSDB_CODE_INVALID_PARA;
} }
taosMemoryFree(dbPath); taosMemoryFree(dbPath);
taosMemoryFree(statePath); taosMemoryFree(statePath);
return pTaskDb; *ppTaskDb = pTaskDb;
return code;
} }
void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy(void* pDb, bool flush) {
@ -2794,8 +2794,10 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0; int32_t code = 0;
int64_t processVer = -1; int64_t processVer = -1;
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); STaskDbWrapper* pTaskDb = NULL;
RocksdbCfInst* pSrcBackend = pCfInst;
code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
@ -4626,10 +4628,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
void dbChkpDestroy(SDbChkp* pChkp); void dbChkpDestroy(SDbChkp* pChkp);
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
int32_t code = 0;
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
@ -4637,41 +4640,41 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
p->preCkptId = -1; p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*)); p->pSST = taosArrayInit(64, sizeof(void*));
if (p->pSST == NULL) { if (p->pSST == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
dbChkpDestroy(p); dbChkpDestroy(p);
return NULL; return code;
} }
p->path = path; p->path = path;
p->len = strlen(path) + 128; p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len); p->buf = taosMemoryCalloc(1, p->len);
if (p->buf == NULL) { if (p->buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->idx = 0; p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[0] == NULL) { if (p->pSstTbl[0] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[1] == NULL) { if (p->pSstTbl[1] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pAdd = taosArrayInit(64, sizeof(void*)); p->pAdd = taosArrayInit(64, sizeof(void*));
if (p->pAdd == NULL) { if (p->pAdd == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
p->pDel = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*));
if (p->pDel == NULL) { if (p->pDel == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT; goto _EXIT;
} }
@ -4679,15 +4682,15 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
taosThreadRwlockInit(&p->rwLock, NULL); taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL; SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list); code = dbChkpGetDelta(p, initChkpId, list);
if (code != 0) { if (code != 0) {
goto _EXIT; goto _EXIT;
} }
*ppChkp = p;
return p; return code;
_EXIT: _EXIT:
dbChkpDestroy(p); dbChkpDestroy(p);
return NULL; return code;
} }
void dbChkpDestroy(SDbChkp* pChkp) { void dbChkpDestroy(SDbChkp* pChkp) {
@ -4880,35 +4883,36 @@ _ERROR:
return code; return code;
} }
SBkdMgt* bkdMgtCreate(char* path) { int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
terrno = 0; int32_t code = 0;
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return code;
} }
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (p->pDbChkpTbl == NULL) { if (p->pDbChkpTbl == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
p->path = taosStrdup(path); p->path = taosStrdup(path);
if (p->path == NULL) { if (p->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
bkdMgtDestroy(p); bkdMgtDestroy(p);
return NULL; return code;
} }
*mgt = p;
return p; return code;
} }
void bkdMgtDestroy(SBkdMgt* bm) { void bkdMgtDestroy(SBkdMgt* bm) {
@ -4949,11 +4953,11 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
return code; return code;
} }
SDbChkp* p = dbChkpCreate(path, chkpId); SDbChkp* p = NULL;
if (p == NULL) { code = dbChkpCreate(path, chkpId, &p);
if (code != 0) {
taosMemoryFree(path); taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock); taosThreadRwlockUnlock(&bm->rwLock);
code = terrno;
return code; return code;
} }
@ -4986,8 +4990,9 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
taosThreadRwlockWrlock(&bm->rwLock); taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) { if (pp == NULL) {
SDbChkp* p = dbChkpCreate(path, 0); SDbChkp* p = NULL;
if (p != NULL) { code = dbChkpCreate(path, 0, &p);
if (code != 0) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
code = 0; code = 0;
} }

View File

@ -68,12 +68,12 @@ static void streamMetaEnvInit() {
} }
} }
void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { void streamMetaCleanup() {
(void) taosCloseRef(streamBackendId); (void)taosCloseRef(streamBackendId);
(void) taosCloseRef(streamBackendCfWrapperId); (void)taosCloseRef(streamBackendCfWrapperId);
(void) taosCloseRef(streamMetaId); (void)taosCloseRef(streamMetaId);
metaRefMgtCleanup(); metaRefMgtCleanup();
streamTimerCleanUp(); streamTimerCleanUp();
@ -128,12 +128,12 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
if (code) { if (code) {
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid); stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
return code; return code;
} }
} else { } else {
SArray* list = *(SArray**)p; SArray* list = *(SArray**)p;
void* px = taosArrayPush(list, &rid); void* px = taosArrayPush(list, &rid);
if (px == NULL) { if (px == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
@ -186,7 +186,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
code = tdbTbcMoveToFirst(pCur); code = tdbTbcMoveToFirst(pCur);
if (code) { if (code) {
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId); stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
return ret; return ret;
} }
@ -215,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
return ret; return ret;
} }
@ -276,6 +276,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
} }
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) { int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
int32_t code = 0;
int64_t chkpId = pTask->chkInfo.checkpointId; int64_t chkpId = pTask->chkInfo.checkpointId;
streamMutexLock(&pMeta->backendMutex); streamMutexLock(&pMeta->backendMutex);
@ -299,8 +300,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
STaskDbWrapper* pBackend = NULL; STaskDbWrapper* pBackend = NULL;
int64_t processVer = -1; int64_t processVer = -1;
while (1) { while (1) {
pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
if (pBackend != NULL) { if (code == 0) {
break; break;
} }
@ -319,7 +320,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
if (processVer != -1) pTask->chkInfo.processedVer = processVer; if (processVer != -1) pTask->chkInfo.processedVer = processVer;
int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
if (code) { if (code) {
stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId); stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
} }
@ -469,8 +470,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
if (pMeta->bkdChkptMgt == NULL) { if (code != 0) {
goto _err; goto _err;
} }
@ -485,7 +486,7 @@ _err:
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) (void) tdbClose(pMeta->db); if (pMeta->db) (void)tdbClose(pMeta->db);
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
@ -531,7 +532,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
// release the ref by timer // release the ref by timer
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
(void) taosTmrStop(p->schedInfo.pDelayTimer); (void)taosTmrStop(p->schedInfo.pDelayTimer);
p->info.delaySchedParam = 0; p->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, p); streamMetaReleaseTask(pMeta, p);
} }
@ -566,7 +567,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
} }
(void) taosRemoveRef(streamMetaId, pMeta->rid); (void)taosRemoveRef(streamMetaId, pMeta->rid);
} }
void streamMetaCloseImpl(void* arg) { void streamMetaCloseImpl(void* arg) {
@ -583,10 +584,10 @@ void streamMetaCloseImpl(void* arg) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// already log the error, ignore here // already log the error, ignore here
(void) tdbAbort(pMeta->db, pMeta->txn); (void)tdbAbort(pMeta->db, pMeta->txn);
(void) tdbTbClose(pMeta->pTaskDb); (void)tdbTbClose(pMeta->pTaskDb);
(void) tdbTbClose(pMeta->pCheckpointDb); (void)tdbTbClose(pMeta->pCheckpointDb);
(void) tdbClose(pMeta->db); (void)tdbClose(pMeta->db);
taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->pTaskList);
taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpSaved);
@ -610,7 +611,7 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt); bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT; pMeta->role = NODE_ROLE_UNINIT;
(void) taosThreadRwlockDestroy(&pMeta->lock); (void)taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stDebug("vgId:%d end to close stream meta", vgId); stDebug("vgId:%d end to close stream meta", vgId);
@ -691,13 +692,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
p = taosArrayPush(pMeta->pTaskList, &pTask->id); p = taosArrayPush(pMeta->pTaskList, &pTask->id);
if (p == NULL) { if (p == NULL) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (code) { if (code) {
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return code; return code;
} }
@ -710,7 +711,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
*pAdded = true; *pAdded = true;
@ -779,7 +780,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
(void) streamTaskSendCheckpointSourceRsp(pTask); (void)streamTaskSendCheckpointSourceRsp(pTask);
} }
return 0; return 0;
} }
@ -802,7 +803,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
} }
// handle the dropping event // handle the dropping event
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
} else { } else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -841,12 +842,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
pTask = *ppTask; pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it // it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
(void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); (void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
(void) streamMetaRemoveTask(pMeta, &id); (void)streamMetaRemoveTask(pMeta, &id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -854,7 +855,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
(void) taosTmrStop(pTask->schedInfo.pDelayTimer); (void)taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->info.delaySchedParam = 0; pTask->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -915,7 +916,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
code = tdbTbcMoveToFirst(pCur); code = tdbTbcMoveToFirst(pCur);
if (code) { if (code) {
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId); stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
return checkpointId; return checkpointId;
} }
@ -953,7 +954,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
SDecoder decoder; SDecoder decoder;
int32_t vgId = 0; int32_t vgId = 0;
int32_t code = 0; int32_t code = 0;
SArray* pRecycleList = NULL; SArray* pRecycleList = NULL;
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
@ -975,7 +976,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (code) { if (code) {
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
(void) tdbTbcClose(pCur); (void)tdbTbcClose(pCur);
return; return;
} }
@ -1008,7 +1009,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask); STaskId id = streamTaskGetTaskId(pTask);
(void) taosArrayPush(pRecycleList, &id); (void)taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList); int32_t total = taosArrayGetSize(pRecycleList);
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
@ -1029,7 +1030,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
(void) taosArrayPush(pMeta->pTaskList, &pTask->id); (void)taosArrayPush(pMeta->pTaskList, &pTask->id);
} else { } else {
// todo this should replace the existed object put by replay creating stream task msg from mnode // todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
@ -1039,17 +1040,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
(void) taosArrayPop(pMeta->pTaskList); (void)taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
continue; continue;
} }
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
} }
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
(void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
@ -1065,7 +1066,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (taosArrayGetSize(pRecycleList) > 0) { if (taosArrayGetSize(pRecycleList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
STaskId* pId = taosArrayGet(pRecycleList, i); STaskId* pId = taosArrayGet(pRecycleList, i);
(void) streamMetaRemoveTask(pMeta, pId); (void)streamMetaRemoveTask(pMeta, pId);
} }
} }
@ -1093,7 +1094,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) { if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
inTimer = true; inTimer = true;
} }
} }
@ -1126,7 +1127,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -1173,7 +1174,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId); // stTrace("vgId:%d meta-rlock", pMeta->vgId);
(void) taosThreadRwlockRdlock(&pMeta->lock); (void)taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
@ -1188,13 +1189,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId); // stTrace("vgId:%d meta-wlock", pMeta->vgId);
(void) taosThreadRwlockWrlock(&pMeta->lock); (void)taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); // stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId); // stTrace("vgId:%d meta-wunlock", pMeta->vgId);
(void) taosThreadRwlockUnlock(&pMeta->lock); (void)taosThreadRwlockUnlock(&pMeta->lock);
} }
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
@ -1320,7 +1321,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue; continue;
} }
@ -1343,7 +1344,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue; continue;
} }
@ -1361,10 +1362,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr); pTask->id.idStr);
(void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
} }
(void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }
@ -1420,14 +1422,14 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
} }
(void) streamTaskStop(pTask); (void)streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -1467,7 +1469,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
(void) streamMetaAddFailedTask(pMeta, streamId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
@ -1558,9 +1560,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) { if (code) {
} }
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
@ -1632,9 +1633,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list. // add the failed task info, along with the related fill-history task info into tasks list.
(void) streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); (void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) { if (hasFillhistoryTask) {
(void) streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); (void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
} }
} else { } else {
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
@ -1649,12 +1650,12 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs; int32_t startTs = pTask->execInfo.checkTs;
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
// automatically set the related fill-history task to be failed. // automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); (void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
} }
} }
@ -1662,7 +1663,7 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
int64_t startTs) { int64_t startTs) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0; int32_t code = 0;
// keep the already updated info // keep the already updated info
STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId}; STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};

View File

@ -69,7 +69,7 @@ void *backendOpen() {
key.ts = ts; key.ts = ts;
const char *val = "value data"; const char *val = "value data";
int32_t vlen = strlen(val); int32_t vlen = strlen(val);
int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen); int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
ASSERT(code == 0); ASSERT(code == 0);
tsArray.push_back(ts); tsArray.push_back(ts);
@ -83,7 +83,7 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t len = 0; int32_t len = 0;
char *newVal = NULL; char *newVal = NULL;
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code == 0); ASSERT(code == 0);
ASSERT(len == strlen(val)); ASSERT(len == strlen(val));
@ -377,7 +377,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000); (int32_t)(strlen(val)), tsStart + 100000);
ASSERT(code == 0); ASSERT(code == 0);
} }
@ -396,7 +396,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0); ASSERT(code == 0);
} }
int32_t code = streamStatePutBatch_rocksdb(p, pBatch); int32_t code = streamStatePutBatch_rocksdb(p, pBatch);
@ -417,7 +417,7 @@ TEST_F(BackendEnv, checkOpen) {
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0); ASSERT(code == 0);
} }
code = streamStatePutBatch_rocksdb(p, pBatch); code = streamStatePutBatch_rocksdb(p, pBatch);
@ -432,13 +432,12 @@ TEST_F(BackendEnv, checkOpen) {
const char *path = "/tmp/backend/stream"; const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump"; const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump); // taosMkDir(dump);
code = taosMulMkDir(dump); taosMulMkDir(dump);
ASSERT(code == 0); SBkdMgt *mgt = NULL;
SBkdMgt *mgt = bkdMgtCreate((char *)path); code = bkdMgtCreate((char *)path, &mgt);
SArray *result = taosArrayInit(4, sizeof(void *)); SArray *result = taosArrayInit(4, sizeof(void *));
code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
ASSERT(code == 0);
code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
ASSERT(code == 0); ASSERT(code == 0);
@ -475,7 +474,7 @@ TEST_F(BackendEnv, backendUtil) {
} }
TEST_F(BackendEnv, oldBackendInit) { TEST_F(BackendEnv, oldBackendInit) {
const char *path = "/tmp/backend1"; const char *path = "/tmp/backend1";
int32_t code = taosMulMkDir(path); int32_t code = taosMulMkDir(path);
ASSERT(code == 0); ASSERT(code == 0);
{ {

View File

@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
void *p = NULL; void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1); // p = taskDbOpen((char *)streamPath, (char *)"test", -1);
p = bkdMgtCreate((char *)streamPath); int32_t code = bkdMgtCreate((char *)streamPath, (SBkdMgt **)&p);
// const int64_t interval = 20 * 1000; // const int64_t interval = 20 * 1000;
// const int64_t watermark = 10 * 60 * 1000; // const int64_t watermark = 10 * 60 * 1000;

View File

@ -134,7 +134,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER); if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
code = 0; code = 0;

View File

@ -84,7 +84,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->destId = *destRaftId; pMsg->destId = *destRaftId;
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); (void)syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
@ -113,7 +113,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
// send msg // send msg
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0); syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); (void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} }
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);

View File

@ -135,7 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// trace log // trace log
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
syncLogSendRequestVoteReply(ths, pReply, ""); syncLogSendRequestVoteReply(ths, pReply, "");
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths); if (resetElect) syncNodeResetElectTimer(ths);

View File

@ -320,7 +320,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
ASSERT(shard->usage >= old->totalCharge); ASSERT(shard->usage >= old->totalCharge);
@ -531,7 +531,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
SLRUEntry *old = shard->lru.next; SLRUEntry *old = shard->lru.next;
ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old));
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
ASSERT(shard->usage >= old->totalCharge); ASSERT(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
@ -577,7 +577,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
if (shard->usage > shard->capacity || eraseIfLastRef) { if (shard->usage > shard->capacity || eraseIfLastRef) {
ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef); ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef);
taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); (void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
} else { } else {
taosLRUCacheShardLRUInsert(shard, e); taosLRUCacheShardLRUInsert(shard, e);