Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-30987-22
This commit is contained in:
commit
beec71374f
|
@ -33,7 +33,7 @@ void executeSQL(TAOS *taos, const char *sql) {
|
|||
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
|
||||
if (code != 0) {
|
||||
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
|
||||
taos_stmt_close(stmt);
|
||||
(void)taos_stmt_close(stmt);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ void insertData(TAOS *taos) {
|
|||
int affectedRows = taos_stmt_affected_rows(stmt);
|
||||
printf("successfully inserted %d rows\n", affectedRows);
|
||||
// close
|
||||
taos_stmt_close(stmt);
|
||||
(void)taos_stmt_close(stmt);
|
||||
}
|
||||
|
||||
int main() {
|
||||
|
|
|
@ -33,7 +33,7 @@ void executeSQL(TAOS *taos, const char *sql) {
|
|||
void checkErrorCode(TAOS_STMT *stmt, int code, const char* msg) {
|
||||
if (code != 0) {
|
||||
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
|
||||
taos_stmt_close(stmt);
|
||||
(void)taos_stmt_close(stmt);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ void insertData(TAOS *taos) {
|
|||
int affectedRows = taos_stmt_affected_rows(stmt);
|
||||
printf("successfully inserted %d rows\n", affectedRows);
|
||||
// close
|
||||
taos_stmt_close(stmt);
|
||||
(void)taos_stmt_close(stmt);
|
||||
}
|
||||
|
||||
int main() {
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
// A simple demo for asynchronous subscription.
|
||||
// compile with:
|
||||
// gcc -o subscribe_demo subscribe_demo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <taos.h>
|
||||
|
||||
int nTotalRows;
|
||||
|
||||
/**
|
||||
* @brief callback function of subscription.
|
||||
*
|
||||
* @param tsub
|
||||
* @param res
|
||||
* @param param. the additional parameter passed to taos_subscribe
|
||||
* @param code. error code
|
||||
*/
|
||||
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) {
|
||||
if (code != 0) {
|
||||
printf("error: %d\n", code);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
TAOS_ROW row = NULL;
|
||||
int num_fields = taos_num_fields(res);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
||||
int nRows = 0;
|
||||
|
||||
while ((row = taos_fetch_row(res))) {
|
||||
char buf[4096] = {0};
|
||||
taos_print_row(buf, row, fields, num_fields);
|
||||
puts(buf);
|
||||
nRows++;
|
||||
}
|
||||
|
||||
nTotalRows += nRows;
|
||||
printf("%d rows consumed.\n", nRows);
|
||||
}
|
||||
|
||||
int main() {
|
||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
int restart = 1; // if the topic already exists, where to subscribe from the begin.
|
||||
const char* topic = "topic-meter-current-bg-10";
|
||||
const char* sql = "select * from power.meters where current > 10";
|
||||
void* param = NULL; // additional parameter.
|
||||
int interval = 2000; // consumption interval in microseconds.
|
||||
TAOS_SUB* tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, interval);
|
||||
|
||||
// wait for insert from others process. you can open TDengine CLI to insert some records for test.
|
||||
|
||||
getchar(); // press Enter to stop
|
||||
|
||||
printf("total rows consumed: %d\n", nTotalRows);
|
||||
int keep = 0; // whether to keep subscribe process
|
||||
taos_unsubscribe(tsub, keep);
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
}
|
|
@ -238,7 +238,7 @@ const char* columnLevelStr(uint8_t type) {
|
|||
|
||||
bool checkColumnEncode(char encode[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(encode)) return true;
|
||||
strtolower(encode, encode);
|
||||
(void)strtolower(encode, encode);
|
||||
for (int i = 0; i < supportedEncodeNum; ++i) {
|
||||
if (0 == strcmp((const char*)encode, supportedEncode[i])) {
|
||||
return true;
|
||||
|
@ -255,7 +255,7 @@ bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OP
|
|||
}
|
||||
bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(compress)) return true;
|
||||
strtolower(compress, compress);
|
||||
(void)strtolower(compress, compress);
|
||||
for (int i = 0; i < supportedCompressNum; ++i) {
|
||||
if (0 == strcmp((const char*)compress, supportedCompress[i])) {
|
||||
return true;
|
||||
|
@ -273,7 +273,7 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES
|
|||
}
|
||||
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(level)) return true;
|
||||
strtolower(level, level);
|
||||
(void)strtolower(level, level);
|
||||
if (1 == strlen(level)) {
|
||||
if ('h' == level[0] || 'm' == level[0] || 'l' == level[0]) return true;
|
||||
} else {
|
||||
|
|
|
@ -68,7 +68,7 @@ bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
|
|||
return false;
|
||||
}
|
||||
}
|
||||
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
|
||||
(void)tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset, pVal);
|
||||
++pIter->colIdx;
|
||||
} else if (TD_IS_KV_ROW(pIter->pRow)) {
|
||||
return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal);
|
||||
|
|
|
@ -212,8 +212,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
|
|||
mndTransRefresh(pMnode, pTrans);
|
||||
|
||||
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
|
||||
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
||||
code = 0;
|
||||
code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
||||
|
||||
_OUT:
|
||||
if (pTrans) mndReleaseTrans(pMnode, pTrans);
|
||||
|
@ -222,7 +221,7 @@ _OUT:
|
|||
|
||||
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
if (pMgmt->transId == 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
|
@ -232,7 +231,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
|||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
pMgmt->errCode = code;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
(void)tsem_post(&pMgmt->syncSem);
|
||||
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
||||
|
@ -241,7 +240,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
|||
}
|
||||
|
||||
_OUT:
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -304,7 +303,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
|||
} else {
|
||||
mInfo("vgId:1, sync restore finished");
|
||||
}
|
||||
mndRefreshUserIpWhiteList(pMnode);
|
||||
(void)mndRefreshUserIpWhiteList(pMnode);
|
||||
|
||||
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
|
||||
}
|
||||
|
@ -350,16 +349,16 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
|
|||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
mInfo("vgId:1, become follower");
|
||||
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
if (pMgmt->transId != 0) {
|
||||
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
(void)tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
}
|
||||
|
||||
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||
|
@ -367,16 +366,16 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
|||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
mInfo("vgId:1, become learner");
|
||||
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
if (pMgmt->transId != 0) {
|
||||
mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
(void)tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
}
|
||||
|
||||
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
||||
|
@ -435,12 +434,12 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
|||
|
||||
int32_t mndInitSync(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
taosThreadMutexInit(&pMgmt->lock, NULL);
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexInit(&pMgmt->lock, NULL);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
|
||||
SSyncInfo syncInfo = {
|
||||
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
|
||||
|
@ -477,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
(void)tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
pMgmt->sync = syncOpen(&syncInfo, true);
|
||||
if (pMgmt->sync <= 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
|
@ -495,15 +494,15 @@ void mndCleanupSync(SMnode *pMnode) {
|
|||
syncStop(pMgmt->sync);
|
||||
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
|
||||
|
||||
tsem_destroy(&pMgmt->syncSem);
|
||||
taosThreadMutexDestroy(&pMgmt->lock);
|
||||
(void)tsem_destroy(&pMgmt->syncSem);
|
||||
(void)taosThreadMutexDestroy(&pMgmt->lock);
|
||||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||
}
|
||||
|
||||
void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||
mTrace("check sync timeout");
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
if (pMgmt->transId != 0) {
|
||||
int32_t curSec = taosGetTimestampSec();
|
||||
int32_t delta = curSec - pMgmt->transSec;
|
||||
|
@ -515,7 +514,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
|||
pMgmt->transSeq = 0;
|
||||
terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
(void)tsem_post(&pMgmt->syncSem);
|
||||
} else {
|
||||
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
||||
|
@ -523,7 +522,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
|||
} else {
|
||||
// mTrace("check sync timeout msg, no trans waiting for confirm");
|
||||
}
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
}
|
||||
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||
|
@ -536,12 +535,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
|||
if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
memcpy(req.pCont, pRaw, req.contLen);
|
||||
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
pMgmt->errCode = 0;
|
||||
|
||||
if (pMgmt->transId != 0) {
|
||||
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
rpcFreeCont(req.pCont);
|
||||
TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
|
||||
}
|
||||
|
@ -555,23 +554,24 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
|||
if (code == 0) {
|
||||
mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
|
||||
pMgmt->transSeq = seq;
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)tsem_wait(&pMgmt->syncSem);
|
||||
} else if (code > 0) {
|
||||
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
||||
code = 0;
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||
if (code == 0) {
|
||||
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
||||
}
|
||||
} else {
|
||||
mError("trans:%d, failed to proposed since %s", transId, terrstr());
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->transSeq = 0;
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
if (terrno == 0) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
@ -600,15 +600,15 @@ void mndSyncStart(SMnode *pMnode) {
|
|||
void mndSyncStop(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
||||
taosThreadMutexLock(&pMgmt->lock);
|
||||
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||
if (pMgmt->transId != 0) {
|
||||
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
|
||||
pMgmt->transId = 0;
|
||||
pMgmt->transSec = 0;
|
||||
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
(void)tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pMgmt->lock);
|
||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||
}
|
||||
|
||||
bool mndIsLeader(SMnode *pMnode) {
|
||||
|
|
|
@ -682,7 +682,7 @@ static void ipRangeToStr(SIpV4Range *range, char *buf) {
|
|||
struct in_addr addr;
|
||||
addr.s_addr = range->ip;
|
||||
|
||||
uv_inet_ntop(AF_INET, &addr, buf, 32);
|
||||
(void)uv_inet_ntop(AF_INET, &addr, buf, 32);
|
||||
if (range->mask != 32) {
|
||||
(void)sprintf(buf + strlen(buf), "/%d", range->mask);
|
||||
}
|
||||
|
@ -2188,7 +2188,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
TAOS_CHECK_GOTO(terrno, &lino, _OVER); // TODO: refactor the terrno to code
|
||||
}
|
||||
taosHashRemove(pNewUser->readDbs, pAlterReq->objname, len);
|
||||
(void)taosHashRemove(pNewUser->readDbs, pAlterReq->objname, len);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
taosHashClear(pNewUser->readDbs);
|
||||
|
@ -2204,7 +2204,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
TAOS_CHECK_GOTO(terrno, &lino, _OVER); // TODO: refactor the terrno to code
|
||||
}
|
||||
taosHashRemove(pNewUser->writeDbs, pAlterReq->objname, len);
|
||||
(void)taosHashRemove(pNewUser->writeDbs, pAlterReq->objname, len);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
taosHashClear(pNewUser->writeDbs);
|
||||
|
@ -2311,7 +2311,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
|
||||
TAOS_CHECK_GOTO(mndAcquireUser(pMnode, alterReq.user, &pUser), &lino, _OVER);
|
||||
|
||||
mndAcquireUser(pMnode, pReq->info.conn.user, &pOperUser);
|
||||
(void)mndAcquireUser(pMnode, pReq->info.conn.user, &pOperUser);
|
||||
if (pOperUser == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_MND_NO_USER_FROM_CONN, &lino, _OVER);
|
||||
}
|
||||
|
@ -2517,7 +2517,7 @@ static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
|
|||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
ipWhiteMgtRemove(pUser->user);
|
||||
(void)ipWhiteMgtRemove(pUser->user);
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(0);
|
||||
|
@ -2830,7 +2830,10 @@ static int32_t mndLoopHash(SHashObj *hash, char *priType, SSDataBlock *pBlock, i
|
|||
}
|
||||
|
||||
if (nodesStringToNode(value, &pAst) == 0) {
|
||||
nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen);
|
||||
if (nodesNodeToSQL(pAst, *sql, bufSz, &sqlLen) != 0) {
|
||||
sqlLen = 5;
|
||||
(void)sprintf(*sql, "error");
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
} else {
|
||||
sqlLen = 5;
|
||||
|
|
|
@ -194,7 +194,7 @@ typedef enum {
|
|||
|
||||
#define TD_SMA_LOOPS_CHECK(n, limit) \
|
||||
if (++(n) > limit) { \
|
||||
sched_yield(); \
|
||||
(void)sched_yield(); \
|
||||
(n) = 0; \
|
||||
}
|
||||
|
||||
|
|
|
@ -257,7 +257,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||
if (refVal == 0) {
|
||||
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
|
||||
(void)taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
|
||||
} else {
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
||||
|
|
|
@ -46,7 +46,7 @@ int32_t smaInit() {
|
|||
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
|
||||
if (old != 2) break;
|
||||
if (++nLoops > 1000) {
|
||||
sched_yield();
|
||||
(void)sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ int32_t smaInit() {
|
|||
|
||||
if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
|
||||
code = terrno;
|
||||
taosCloseRef(smaMgmt.rsetId);
|
||||
(void)taosCloseRef(smaMgmt.rsetId);
|
||||
if (smaMgmt.refHash) {
|
||||
taosHashCleanup(smaMgmt.refHash);
|
||||
smaMgmt.refHash = NULL;
|
||||
|
@ -97,7 +97,7 @@ void smaCleanUp() {
|
|||
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
|
||||
if (old != 2) break;
|
||||
if (++nLoops > 1000) {
|
||||
sched_yield();
|
||||
(void)sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
|
|||
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
|
||||
|
||||
if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
|
||||
tdFreeSmaEnv(pEnv);
|
||||
(void)tdFreeSmaEnv(pEnv);
|
||||
*ppEnv = NULL;
|
||||
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
|
||||
: atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
|
||||
|
@ -179,7 +179,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
|
|||
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
|
||||
(void)taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
|
||||
}
|
||||
tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo);
|
||||
(void)tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,7 +207,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
|
||||
pRSmaStat->pSma = (SSma *)pSma;
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
|
||||
tsem_init(&pRSmaStat->notEmpty, 0, 0);
|
||||
(void)tsem_init(&pRSmaStat->notEmpty, 0, 0);
|
||||
if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
|
@ -216,7 +216,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
(void)taosArrayPush(pRSmaStat->blocks, &datablock);
|
||||
|
||||
// init smaMgmt
|
||||
smaInit();
|
||||
TAOS_CHECK_GOTO(smaInit(), &lino, _exit);
|
||||
|
||||
int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
|
||||
if (refId < 0) {
|
||||
|
@ -285,20 +285,20 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
}
|
||||
|
||||
// step 3:
|
||||
tdRsmaStopExecutor(pSma);
|
||||
(void)tdRsmaStopExecutor(pSma);
|
||||
|
||||
// step 4: destroy the rsma info and associated fetch tasks
|
||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||
|
||||
// step 5: free pStat
|
||||
tsem_destroy(&(pStat->notEmpty));
|
||||
(void)tsem_destroy(&(pStat->notEmpty));
|
||||
taosArrayDestroy(pStat->blocks);
|
||||
taosMemoryFreeClear(pStat);
|
||||
}
|
||||
}
|
||||
|
||||
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||
tdDestroySmaState(pSmaStat, smaType);
|
||||
(void)tdDestroySmaState(pSmaStat, smaType);
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
taosMemoryFreeClear(pSmaStat);
|
||||
}
|
||||
|
@ -438,7 +438,7 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) {
|
|||
pthread = (TdThread *)&pStat->data;
|
||||
|
||||
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||
tsem_post(&(pRSmaStat->notEmpty));
|
||||
(void)tsem_post(&(pRSmaStat->notEmpty));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||
|
|
|
@ -412,7 +412,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
|
||||
_exit:
|
||||
if (code != 0) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo);
|
||||
(void)tdFreeRSmaInfo(pSma, pRSmaInfo);
|
||||
} else {
|
||||
smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
|
||||
}
|
||||
|
@ -1264,7 +1264,7 @@ _checkpoint:
|
|||
if (pItem && pItem->pStreamTask) {
|
||||
SStreamTask *pTask = pItem->pStreamTask;
|
||||
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
|
||||
streamTaskSetActiveCheckpointInfo(pTask, checkpointId);
|
||||
(void)streamTaskSetActiveCheckpointInfo(pTask, checkpointId);
|
||||
|
||||
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
|
||||
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
|
||||
|
@ -1373,7 +1373,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
", rsetId:%d refId:%" PRIi64,
|
||||
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
|
||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
|
||||
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
(void)taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
}
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
(void)tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
||||
|
@ -1629,7 +1629,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
batchMax = TMAX(batchMax, 4);
|
||||
}
|
||||
while (occupied || (++batchCnt < batchMax)) { // greedy mode
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
(void)taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
|
||||
if (qallItemSize > 0) {
|
||||
if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
|
||||
|
@ -1663,7 +1663,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
}
|
||||
|
||||
if (qallItemSize > 0) {
|
||||
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||
(void)atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||
continue;
|
||||
}
|
||||
if (RSMA_NEED_FETCH(pInfo)) {
|
||||
|
@ -1673,7 +1673,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0);
|
||||
(void)atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -123,7 +123,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
|
|||
TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
|
||||
|
||||
// create stable to save tsma result in dstVgId
|
||||
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
(void)tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
pReq.name = (char *)tNameGetTableName(&stbFullName);
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
|
@ -283,7 +283,7 @@ static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *
|
|||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
|
||||
tEncodeSBatchDeleteReq(&encoder, pDelReq);
|
||||
(void)tEncodeSBatchDeleteReq(&encoder, pDelReq);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
|
||||
|
|
|
@ -39,13 +39,15 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
// alloc
|
||||
pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pReader->pTq = pTq;
|
||||
pReader->sver = sver;
|
||||
pReader->ever = ever;
|
||||
pReader->tdbTbList = taosArrayInit(4, sizeof(STablePair));
|
||||
if (pReader->tdbTbList == NULL) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err);
|
||||
}
|
||||
|
||||
STablePair pair1 = {.tbl = pTq->pStreamMeta->pTaskDb, .type = SNAP_DATA_STREAM_TASK};
|
||||
taosArrayPush(pReader->tdbTbList, &pair1);
|
||||
|
@ -60,16 +62,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
if (code) {
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to open, reason: %s", TD_VID(pTq->pVnode),
|
||||
tstrerror(code));
|
||||
taosMemoryFree(pReader);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||
if (code) {
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader failed to iterate, reason: %s", TD_VID(pTq->pVnode),
|
||||
tstrerror(code));
|
||||
taosMemoryFree(pReader);
|
||||
goto _err;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
|
||||
|
@ -79,11 +79,14 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
streamTaskSnapReaderClose(pReader);
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
|
||||
if (pReader == NULL) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode));
|
||||
taosArrayDestroy(pReader->tdbTbList);
|
||||
|
@ -116,6 +119,10 @@ NextTbl:
|
|||
break;
|
||||
} else {
|
||||
pVal = taosMemoryCalloc(1, tLen);
|
||||
if (pVal == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(pVal, tVal, tLen);
|
||||
vLen = tLen;
|
||||
}
|
||||
|
@ -174,8 +181,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
// alloc
|
||||
pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pWriter->pTq = pTq;
|
||||
pWriter->sver = sver;
|
||||
|
@ -184,12 +190,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
*ppWriter = pWriter;
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
|
||||
|
@ -207,8 +207,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||
code = -1;
|
||||
if ((code = tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0)) < 0) {
|
||||
taosMemoryFree(pWriter);
|
||||
goto _err;
|
||||
}
|
||||
|
@ -241,10 +240,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
|
|||
|
||||
int64_t key[2] = {taskId.streamId, taskId.taskId};
|
||||
streamMetaWLock(pTq->pStreamMeta);
|
||||
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
||||
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
|
||||
if ((code =
|
||||
tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
||||
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn)) < 0) {
|
||||
streamMetaWUnLock(pTq->pStreamMeta);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
streamMetaWUnLock(pTq->pStreamMeta);
|
||||
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
||||
|
|
|
@ -128,7 +128,7 @@ static STaskQueue taskQueue = {0};
|
|||
|
||||
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
|
||||
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
|
||||
execFn(pSchedMsg->thandle);
|
||||
(void)execFn(pSchedMsg->thandle);
|
||||
taosFreeQitem(pSchedMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -143,9 +143,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
|
|||
void streamBackendDelCompare(void* backend, void* arg);
|
||||
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
|
||||
|
||||
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer);
|
||||
void taskDbDestroy(void* pBackend, bool flush);
|
||||
void taskDbDestroy2(void* pBackend);
|
||||
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb);
|
||||
void taskDbDestroy(void* pBackend, bool flush);
|
||||
void taskDbDestroy2(void* pBackend);
|
||||
|
||||
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
|
||||
|
||||
|
@ -252,7 +252,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
|
|||
|
||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
|
||||
|
||||
SBkdMgt* bkdMgtCreate(char* path);
|
||||
int32_t bkdMgtCreate(char* path, SBkdMgt **bm);
|
||||
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
|
||||
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
|
||||
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
||||
|
|
|
@ -2525,35 +2525,35 @@ _EXIT:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) {
|
||||
int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer, STaskDbWrapper** ppTaskDb) {
|
||||
char* statePath = NULL;
|
||||
char* dbPath = NULL;
|
||||
int code = 0;
|
||||
terrno = 0;
|
||||
if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) {
|
||||
terrno = code;
|
||||
stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key,
|
||||
chkptId, tstrerror(terrno));
|
||||
return NULL;
|
||||
chkptId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
|
||||
if (pTaskDb != NULL) {
|
||||
int64_t chkpId = -1, ver = -1;
|
||||
if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) {
|
||||
if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver)) == 0) {
|
||||
*processVer = ver;
|
||||
} else {
|
||||
terrno = code;
|
||||
stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId,
|
||||
tstrerror(terrno));
|
||||
tstrerror(code));
|
||||
taskDbDestroy(pTaskDb, false);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
taosMemoryFree(dbPath);
|
||||
taosMemoryFree(statePath);
|
||||
return pTaskDb;
|
||||
*ppTaskDb = pTaskDb;
|
||||
return code;
|
||||
}
|
||||
|
||||
void taskDbDestroy(void* pDb, bool flush) {
|
||||
|
@ -2794,8 +2794,10 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
|
|||
int32_t code = 0;
|
||||
|
||||
int64_t processVer = -1;
|
||||
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer);
|
||||
RocksdbCfInst* pSrcBackend = pCfInst;
|
||||
STaskDbWrapper* pTaskDb = NULL;
|
||||
|
||||
code = taskDbOpen(path, key, 0, &processVer, &pTaskDb);
|
||||
RocksdbCfInst* pSrcBackend = pCfInst;
|
||||
|
||||
for (int i = 0; i < nCf; i++) {
|
||||
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
|
||||
|
@ -4626,10 +4628,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
|||
|
||||
void dbChkpDestroy(SDbChkp* pChkp);
|
||||
|
||||
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
|
||||
int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
|
||||
int32_t code = 0;
|
||||
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
|
||||
if (p == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
|
@ -4637,41 +4640,41 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
|
|||
p->preCkptId = -1;
|
||||
p->pSST = taosArrayInit(64, sizeof(void*));
|
||||
if (p->pSST == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
dbChkpDestroy(p);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
p->path = path;
|
||||
p->len = strlen(path) + 128;
|
||||
p->buf = taosMemoryCalloc(1, p->len);
|
||||
if (p->buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
p->idx = 0;
|
||||
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (p->pSstTbl[0] == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (p->pSstTbl[1] == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
p->pAdd = taosArrayInit(64, sizeof(void*));
|
||||
if (p->pAdd == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
p->pDel = taosArrayInit(64, sizeof(void*));
|
||||
if (p->pDel == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
|
@ -4679,15 +4682,15 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
|
|||
taosThreadRwlockInit(&p->rwLock, NULL);
|
||||
|
||||
SArray* list = NULL;
|
||||
int32_t code = dbChkpGetDelta(p, initChkpId, list);
|
||||
code = dbChkpGetDelta(p, initChkpId, list);
|
||||
if (code != 0) {
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
return p;
|
||||
*ppChkp = p;
|
||||
return code;
|
||||
_EXIT:
|
||||
dbChkpDestroy(p);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
void dbChkpDestroy(SDbChkp* pChkp) {
|
||||
|
@ -4880,35 +4883,36 @@ _ERROR:
|
|||
return code;
|
||||
}
|
||||
|
||||
SBkdMgt* bkdMgtCreate(char* path) {
|
||||
terrno = 0;
|
||||
int32_t bkdMgtCreate(char* path, SBkdMgt** mgt) {
|
||||
int32_t code = 0;
|
||||
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
|
||||
if (p == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (p->pDbChkpTbl == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
bkdMgtDestroy(p);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
p->path = taosStrdup(path);
|
||||
if (p->path == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
bkdMgtDestroy(p);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
bkdMgtDestroy(p);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
*mgt = p;
|
||||
|
||||
return p;
|
||||
return code;
|
||||
}
|
||||
|
||||
void bkdMgtDestroy(SBkdMgt* bm) {
|
||||
|
@ -4949,11 +4953,11 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
|
|||
return code;
|
||||
}
|
||||
|
||||
SDbChkp* p = dbChkpCreate(path, chkpId);
|
||||
if (p == NULL) {
|
||||
SDbChkp* p = NULL;
|
||||
code = dbChkpCreate(path, chkpId, &p);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(path);
|
||||
taosThreadRwlockUnlock(&bm->rwLock);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -4986,8 +4990,9 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
|
|||
taosThreadRwlockWrlock(&bm->rwLock);
|
||||
SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
|
||||
if (pp == NULL) {
|
||||
SDbChkp* p = dbChkpCreate(path, 0);
|
||||
if (p != NULL) {
|
||||
SDbChkp* p = NULL;
|
||||
code = dbChkpCreate(path, 0, &p);
|
||||
if (code != 0) {
|
||||
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
|
||||
code = 0;
|
||||
}
|
||||
|
|
|
@ -68,12 +68,12 @@ static void streamMetaEnvInit() {
|
|||
}
|
||||
}
|
||||
|
||||
void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||
void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||
|
||||
void streamMetaCleanup() {
|
||||
(void) taosCloseRef(streamBackendId);
|
||||
(void) taosCloseRef(streamBackendCfWrapperId);
|
||||
(void) taosCloseRef(streamMetaId);
|
||||
(void)taosCloseRef(streamBackendId);
|
||||
(void)taosCloseRef(streamBackendCfWrapperId);
|
||||
(void)taosCloseRef(streamMetaId);
|
||||
|
||||
metaRefMgtCleanup();
|
||||
streamTimerCleanUp();
|
||||
|
@ -128,12 +128,12 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|||
|
||||
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t) vgId, *rid);
|
||||
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
SArray* list = *(SArray**)p;
|
||||
void* px = taosArrayPush(list, &rid);
|
||||
void* px = taosArrayPush(list, &rid);
|
||||
if (px == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|||
|
||||
code = tdbTbcMoveToFirst(pCur);
|
||||
if (code) {
|
||||
(void) tdbTbcClose(pCur);
|
||||
(void)tdbTbcClose(pCur);
|
||||
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
|
||||
return ret;
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
(void) tdbTbcClose(pCur);
|
||||
(void)tdbTbcClose(pCur);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -276,6 +276,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
|
||||
int32_t code = 0;
|
||||
int64_t chkpId = pTask->chkInfo.checkpointId;
|
||||
|
||||
streamMutexLock(&pMeta->backendMutex);
|
||||
|
@ -299,8 +300,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
|||
STaskDbWrapper* pBackend = NULL;
|
||||
int64_t processVer = -1;
|
||||
while (1) {
|
||||
pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer);
|
||||
if (pBackend != NULL) {
|
||||
code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
|
||||
if (code == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -319,7 +320,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
|||
|
||||
if (processVer != -1) pTask->chkInfo.processedVer = processVer;
|
||||
|
||||
int32_t code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||
code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||
if (code) {
|
||||
stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
|
||||
}
|
||||
|
@ -469,8 +470,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
|||
|
||||
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
||||
|
||||
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
|
||||
if (pMeta->bkdChkptMgt == NULL) {
|
||||
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
|
||||
if (code != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -485,7 +486,7 @@ _err:
|
|||
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
||||
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
|
||||
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
|
||||
if (pMeta->db) (void) tdbClose(pMeta->db);
|
||||
if (pMeta->db) (void)tdbClose(pMeta->db);
|
||||
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
||||
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
|
@ -531,7 +532,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
// release the ref by timer
|
||||
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
|
||||
(void) taosTmrStop(p->schedInfo.pDelayTimer);
|
||||
(void)taosTmrStop(p->schedInfo.pDelayTimer);
|
||||
p->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, p);
|
||||
}
|
||||
|
@ -566,7 +567,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
if (pMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
(void) taosRemoveRef(streamMetaId, pMeta->rid);
|
||||
(void)taosRemoveRef(streamMetaId, pMeta->rid);
|
||||
}
|
||||
|
||||
void streamMetaCloseImpl(void* arg) {
|
||||
|
@ -583,10 +584,10 @@ void streamMetaCloseImpl(void* arg) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// already log the error, ignore here
|
||||
(void) tdbAbort(pMeta->db, pMeta->txn);
|
||||
(void) tdbTbClose(pMeta->pTaskDb);
|
||||
(void) tdbTbClose(pMeta->pCheckpointDb);
|
||||
(void) tdbClose(pMeta->db);
|
||||
(void)tdbAbort(pMeta->db, pMeta->txn);
|
||||
(void)tdbTbClose(pMeta->pTaskDb);
|
||||
(void)tdbTbClose(pMeta->pCheckpointDb);
|
||||
(void)tdbClose(pMeta->db);
|
||||
|
||||
taosArrayDestroy(pMeta->pTaskList);
|
||||
taosArrayDestroy(pMeta->chkpSaved);
|
||||
|
@ -610,7 +611,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
bkdMgtDestroy(pMeta->bkdChkptMgt);
|
||||
|
||||
pMeta->role = NODE_ROLE_UNINIT;
|
||||
(void) taosThreadRwlockDestroy(&pMeta->lock);
|
||||
(void)taosThreadRwlockDestroy(&pMeta->lock);
|
||||
|
||||
taosMemoryFree(pMeta);
|
||||
stDebug("vgId:%d end to close stream meta", vgId);
|
||||
|
@ -691,13 +692,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
|
||||
p = taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
if (p == NULL) {
|
||||
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId);
|
||||
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
|
||||
if (code) {
|
||||
stError("s-task:0x%"PRIx64" failed to register task into meta-list, code: out of memory", id.taskId);
|
||||
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -710,7 +711,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
*pAdded = true;
|
||||
|
@ -779,7 +780,7 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
|
|||
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
(void) streamTaskSendCheckpointSourceRsp(pTask);
|
||||
(void)streamTaskSendCheckpointSourceRsp(pTask);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -802,7 +803,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
}
|
||||
|
||||
// handle the dropping event
|
||||
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||
} else {
|
||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
@ -841,12 +842,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
pTask = *ppTask;
|
||||
// it is an fill-history task, remove the related stream task's id that points to it
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void) atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
(void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
(void) taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
(void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||
(void) streamMetaRemoveTask(pMeta, &id);
|
||||
(void)streamMetaRemoveTask(pMeta, &id);
|
||||
|
||||
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
@ -854,7 +855,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
ASSERT(pTask->status.timerActive == 0);
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
||||
(void) taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
pTask->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
@ -915,7 +916,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
|
||||
code = tdbTbcMoveToFirst(pCur);
|
||||
if (code) {
|
||||
(void) tdbTbcClose(pCur);
|
||||
(void)tdbTbcClose(pCur);
|
||||
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
|
||||
return checkpointId;
|
||||
}
|
||||
|
@ -953,7 +954,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
SDecoder decoder;
|
||||
int32_t vgId = 0;
|
||||
int32_t code = 0;
|
||||
SArray* pRecycleList = NULL;
|
||||
SArray* pRecycleList = NULL;
|
||||
|
||||
if (pMeta == NULL) {
|
||||
return;
|
||||
|
@ -975,7 +976,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (code) {
|
||||
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
(void) tdbTbcClose(pCur);
|
||||
(void)tdbTbcClose(pCur);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1008,7 +1009,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
tFreeStreamTask(pTask);
|
||||
|
||||
STaskId id = streamTaskGetTaskId(pTask);
|
||||
(void) taosArrayPush(pRecycleList, &id);
|
||||
(void)taosArrayPush(pRecycleList, &id);
|
||||
|
||||
int32_t total = taosArrayGetSize(pRecycleList);
|
||||
stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
|
||||
|
@ -1029,7 +1030,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
continue;
|
||||
}
|
||||
|
||||
(void) taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
(void)taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
} else {
|
||||
// todo this should replace the existed object put by replay creating stream task msg from mnode
|
||||
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
|
||||
|
@ -1039,17 +1040,17 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
|
||||
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
|
||||
(void) taosArrayPop(pMeta->pTaskList);
|
||||
(void)taosArrayPop(pMeta->pTaskList);
|
||||
tFreeStreamTask(pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void) atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
(void) atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
|
@ -1065,7 +1066,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
||||
STaskId* pId = taosArrayGet(pRecycleList, i);
|
||||
(void) streamMetaRemoveTask(pMeta, pId);
|
||||
(void)streamMetaRemoveTask(pMeta, pId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1093,7 +1094,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->status.timerActive >= 1) {
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
|
||||
(void) streamTaskStop(pTask);
|
||||
(void)streamTaskStop(pTask);
|
||||
inTimer = true;
|
||||
}
|
||||
}
|
||||
|
@ -1126,7 +1127,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
|
||||
(void) streamTaskStop(pTask);
|
||||
(void)streamTaskStop(pTask);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
@ -1173,7 +1174,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
|
|||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
(void) taosThreadRwlockRdlock(&pMeta->lock);
|
||||
(void)taosThreadRwlockRdlock(&pMeta->lock);
|
||||
}
|
||||
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
|
@ -1188,13 +1189,13 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
(void) taosThreadRwlockWrlock(&pMeta->lock);
|
||||
(void)taosThreadRwlockWrlock(&pMeta->lock);
|
||||
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
|
||||
}
|
||||
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
(void) taosThreadRwlockUnlock(&pMeta->lock);
|
||||
(void)taosThreadRwlockUnlock(&pMeta->lock);
|
||||
}
|
||||
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||
|
@ -1320,7 +1321,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1343,7 +1344,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
(void) streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1361,10 +1362,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||
pTask->id.idStr);
|
||||
(void) streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
||||
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
||||
}
|
||||
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true);
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
|
||||
true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -1420,14 +1422,14 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = NULL;
|
||||
SStreamTask* pTask = NULL;
|
||||
|
||||
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
|
||||
(void) streamTaskStop(pTask);
|
||||
(void)streamTaskStop(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
|
@ -1467,7 +1469,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
|||
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
|
||||
(void) streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
|
@ -1558,9 +1560,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
if (code) {
|
||||
|
||||
}
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
|
@ -1632,9 +1633,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|||
streamMetaRUnLock(pMeta);
|
||||
|
||||
// add the failed task info, along with the related fill-history task info into tasks list.
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
if (hasFillhistoryTask) {
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
}
|
||||
} else {
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
@ -1649,12 +1650,12 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|||
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
(void) streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1662,7 +1663,7 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
|
|||
int64_t startTs) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
// keep the already updated info
|
||||
STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
|
||||
|
|
|
@ -69,7 +69,7 @@ void *backendOpen() {
|
|||
key.ts = ts;
|
||||
const char *val = "value data";
|
||||
int32_t vlen = strlen(val);
|
||||
int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
|
||||
int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
|
||||
ASSERT(code == 0);
|
||||
|
||||
tsArray.push_back(ts);
|
||||
|
@ -83,7 +83,7 @@ void *backendOpen() {
|
|||
const char *val = "value data";
|
||||
int32_t len = 0;
|
||||
char *newVal = NULL;
|
||||
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||
ASSERT(code == 0);
|
||||
|
||||
ASSERT(len == strlen(val));
|
||||
|
@ -377,7 +377,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000);
|
||||
(int32_t)(strlen(val)), tsStart + 100000);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
|
@ -396,7 +396,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
int32_t code = streamStatePutBatch_rocksdb(p, pBatch);
|
||||
|
@ -417,7 +417,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
code = streamStatePutBatch_rocksdb(p, pBatch);
|
||||
|
@ -432,13 +432,12 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
const char *path = "/tmp/backend/stream";
|
||||
const char *dump = "/tmp/backend/stream/dump";
|
||||
// taosMkDir(dump);
|
||||
code = taosMulMkDir(dump);
|
||||
ASSERT(code == 0);
|
||||
taosMulMkDir(dump);
|
||||
SBkdMgt *mgt = NULL;
|
||||
|
||||
SBkdMgt *mgt = bkdMgtCreate((char *)path);
|
||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
||||
code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||
ASSERT(code == 0);
|
||||
code = bkdMgtCreate((char *)path, &mgt);
|
||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||
|
||||
code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
|
||||
ASSERT(code == 0);
|
||||
|
@ -475,7 +474,7 @@ TEST_F(BackendEnv, backendUtil) {
|
|||
}
|
||||
TEST_F(BackendEnv, oldBackendInit) {
|
||||
const char *path = "/tmp/backend1";
|
||||
int32_t code = taosMulMkDir(path);
|
||||
int32_t code = taosMulMkDir(path);
|
||||
ASSERT(code == 0);
|
||||
|
||||
{
|
||||
|
|
|
@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
|
|||
void *p = NULL;
|
||||
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
|
||||
// p = taskDbOpen((char *)streamPath, (char *)"test", -1);
|
||||
p = bkdMgtCreate((char *)streamPath);
|
||||
int32_t code = bkdMgtCreate((char *)streamPath, (SBkdMgt **)&p);
|
||||
|
||||
// const int64_t interval = 20 * 1000;
|
||||
// const int64_t watermark = 10 * 60 * 1000;
|
||||
|
|
|
@ -134,7 +134,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
|||
}
|
||||
|
||||
SSnapshot snapshot = {0};
|
||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // TODO: check the return code
|
||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
if (prevIndex == snapshot.lastApplyIndex) {
|
||||
*pSyncTerm = snapshot.lastApplyTerm;
|
||||
return 0;
|
||||
|
@ -184,7 +184,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
|
||||
int32_t code = 0, lino = 0;
|
||||
SSnapshot snapshot = {0};
|
||||
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
|
||||
|
||||
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||
SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0);
|
||||
|
|
|
@ -86,14 +86,17 @@ int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSn
|
|||
pSender->replicaIndex = replicaIndex;
|
||||
pSender->term = raftStoreGetTerm(pSyncNode);
|
||||
pSender->startTime = -1;
|
||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||
pSender->finish = false;
|
||||
|
||||
code = pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||
if (code != 0) {
|
||||
taosMemoryFreeClear(pSender);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
SSyncSnapBuffer *pSndBuf = NULL;
|
||||
code = syncSnapBufferCreate(&pSndBuf);
|
||||
if (pSndBuf == NULL) {
|
||||
taosMemoryFree(pSender);
|
||||
pSender = NULL;
|
||||
taosMemoryFreeClear(pSender);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
|
||||
|
@ -472,7 +475,7 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
|||
syncSnapBufferDestroy(&pReceiver->pRcvBuf);
|
||||
}
|
||||
|
||||
snapshotReceiverClearInfoData(pReceiver);
|
||||
(void)snapshotReceiverClearInfoData(pReceiver);
|
||||
|
||||
// free receiver
|
||||
taosMemoryFree(pReceiver);
|
||||
|
@ -592,7 +595,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
|
||||
&pReceiver->snapshot);
|
||||
if (code != 0) {
|
||||
sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
|
||||
sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
pReceiver->pWriter = NULL;
|
||||
|
@ -603,7 +606,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
|
||||
// get fsmState
|
||||
SSnapshot snapshot = {0};
|
||||
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
|
||||
code = pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
|
||||
if (code != 0) {
|
||||
sRError(pReceiver, "snapshot receiver get snapshot info failed since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
pReceiver->pSyncNode->fsmState = snapshot.state;
|
||||
|
||||
// reset wal
|
||||
|
@ -1276,13 +1283,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
|||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||
sSInfo(pSender, "process end rsp");
|
||||
snapshotSenderStop(pSender, true);
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
(void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_ERROR:
|
||||
snapshotSenderStop(pSender, false);
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
(void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
}
|
||||
|
||||
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
||||
|
@ -253,7 +253,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
|||
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
}
|
||||
|
||||
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
||||
|
@ -302,7 +302,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
|||
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||
}
|
||||
|
||||
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
||||
|
|
|
@ -483,7 +483,7 @@ const STfsFile *tfsReaddir(STfsDir *pTfsDir) {
|
|||
void tfsClosedir(STfsDir *pTfsDir) {
|
||||
if (pTfsDir) {
|
||||
if (pTfsDir->pDir != NULL) {
|
||||
taosCloseDir(&pTfsDir->pDir);
|
||||
(void)taosCloseDir(&pTfsDir->pDir);
|
||||
pTfsDir->pDir = NULL;
|
||||
}
|
||||
taosMemoryFree(pTfsDir);
|
||||
|
|
Loading…
Reference in New Issue