From b7788aca04f161e539f49b04fae5349e62948013 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 2 Mar 2022 16:22:43 +0800 Subject: [PATCH] fix --- source/client/src/tmq.c | 54 +++++++++++++---------- source/dnode/vnode/inc/meta.h | 12 ++--- source/dnode/vnode/inc/vnode.h | 21 +++++++++ source/dnode/vnode/src/inc/metaDef.h | 2 +- source/dnode/vnode/src/meta/metaBDBImpl.c | 34 +++++++------- source/dnode/vnode/src/tq/tq.c | 4 +- source/libs/executor/src/executor.c | 8 ++-- source/libs/wal/src/walWrite.c | 13 +++--- source/util/src/terror.c | 3 ++ 9 files changed, 94 insertions(+), 57 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d9ab23b9fa..ab955be491 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -66,6 +66,7 @@ struct tmq_t { STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; + int32_t waitingRequest; SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; @@ -209,12 +210,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->tmq->commit_cb) { pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); } - if (!pParam->async) - tsem_post(&pParam->rspSem); - else { - tsem_destroy(&pParam->rspSem); - free(param); - } + if (!pParam->async) tsem_post(&pParam->rspSem); return 0; } @@ -243,6 +239,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; + pTmq->waitingRequest = 0; // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -315,6 +312,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); + pParam->async = async; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -335,6 +333,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in resp = pParam->rspErr; } + tsem_destroy(&pParam->rspSem); + free(pParam); + if (pArray) { taosArrayDestroy(pArray); } @@ -576,7 +577,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; return pRsp->skipLogNum; } @@ -625,13 +626,14 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - printf("recv poll\n"); + /*printf("recv poll\n");*/ SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); if (pParam->epoch == tmq->epoch) { + atomic_sub_fetch_32(&tmq->waitingRequest, 1); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } return 0; @@ -646,6 +648,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (msgEpoch != tmqEpoch) { printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } else { + atomic_sub_fetch_32(&tmq->waitingRequest, 1); } /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ @@ -658,7 +662,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { - printf("no data\n"); + /*printf("no data\n");*/ if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } @@ -667,7 +671,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } pRsp->extra = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); - printf("poll in queue\n"); + + /*printf("poll in queue\n");*/ /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ @@ -743,7 +748,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmqAskEp(tmq_t* tmq, bool sync) { - printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -812,7 +816,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -832,7 +836,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie strcpy(pReq->topic, pTopic->topicName); strcpy(pReq->cgroup, tmq->groupId); - pReq->blockingTime = blocking_time; + pReq->blockingTime = blockingTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; @@ -863,7 +867,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { - printf("call poll\n"); + /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -900,7 +904,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->fp = tmqPollCb; int64_t transporterId = 0; - printf("send poll\n"); + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; @@ -912,7 +917,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { // return int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { - printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); + /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); tmqClearUnhandleMsg(tmq); @@ -935,9 +940,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { - printf("epoch match\n"); + /*printf("epoch match\n");*/ SMqClientVg* pVg = rspMsg->extra; pVg->currentOffset = rspMsg->consumeRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -947,7 +952,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese taosFreeQitem(rspMsg); } } else { - printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; tmqHandleRes(tmq, rspMsg, &reset); taosFreeQitem(rspMsg); @@ -972,12 +977,16 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (rspMsg == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); } - tmqHandleAllRsp(tmq, blocking_time, false); - - tmqPollImpl(tmq, blocking_time); + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); + if (rspMsg) { + return rspMsg; + } while (1) { /*printf("cycle\n");*/ + if (atomic_load_32(&tmq->waitingRequest) == 0) { + tmqPollImpl(tmq, blocking_time); + } taosReadAllQitems(tmq->mqueue, tmq->qall); rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); if (rspMsg) { @@ -986,7 +995,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { - printf("normal exit\n"); return NULL; } } diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index c98a97d0dc..fd079b8f32 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -24,8 +24,8 @@ extern "C" { #endif -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE // Types exported @@ -50,14 +50,14 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // For Query -STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -char * metaTbCursorNext(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 31f04e840a..2c6bfd879a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -214,6 +214,10 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA //} static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash) { + taosHashClear(pHandle->tbIdHash); + } + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pHandle->tbIdHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -228,6 +232,23 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S return 0; } +static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash == NULL) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} + int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index f4128f7170..71bfd91356 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -76,4 +76,4 @@ struct SMeta { } #endif -#endif /*_TD_META_DEF_H_*/ \ No newline at end of file +#endif /*_TD_META_DEF_H_*/ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 3c464391d2..f75d9d3db0 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -62,10 +62,10 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); +static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); -static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); +static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void metaDBWLock(SMetaDB *pDB); static void metaDBRLock(SMetaDB *pDB); static void metaDBULock(SMetaDB *pDB); @@ -142,7 +142,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; char buf[512]; char buf1[512]; - void * pBuf; + void *pBuf; DBT key1, value1; DBT key2, value2; SSchema *pSchema = NULL; @@ -394,7 +394,7 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - DBT * pDbt; + DBT *pDbt; if (pTbCfg->type == META_CHILD_TABLE) { // pDbt = calloc(2, sizeof(DBT)); @@ -479,7 +479,7 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { /* ------------------------ FOR QUERY ------------------------ */ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT value = {0}; @@ -509,7 +509,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { } STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT pkey = {0}; @@ -543,10 +543,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { uint32_t nCols; SSchemaWrapper *pSW = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; - void * pBuf; - SSchema * pSchema; + void *pBuf; + SSchema *pSchema; SSchemaKey schemaKey = {uid, sver, 0}; DBT key = {0}; DBT value = {0}; @@ -578,7 +578,7 @@ struct SMTbCursor { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *pTbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur)); if (pTbCur == NULL) { @@ -609,7 +609,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { DBT key = {0}; DBT value = {0}; STbCfg tbCfg; - void * pBuf; + void *pBuf; for (;;) { if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { @@ -631,10 +631,10 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchemaBuilder sb; - STSchema * pTSchema = NULL; - SSchema * pSchema; + STSchema *pTSchema = NULL; + SSchema *pSchema; SSchemaWrapper *pSW; - STbCfg * pTbCfg; + STbCfg *pTbCfg; tb_uid_t quid; pTbCfg = metaGetTbInfoByUid(pMeta, uid); @@ -662,13 +662,13 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { } struct SMCtbCursor { - DBC * pCur; + DBC *pCur; tb_uid_t suid; }; SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur)); @@ -700,7 +700,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { DBT skey = {0}; DBT pkey = {0}; DBT pval = {0}; - void * pBuf; + void *pBuf; STbCfg tbCfg; // Set key diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa198d0806..ccad006657 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -72,6 +72,8 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // if waiting + // memcpy and send msg to fetch thread // TODO: add reference // if handle waiting, launch query and response to consumer // @@ -210,7 +212,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; int64_t fetchOffset; - /*int64_t blockingTime = pReq->blockingTime;*/ + int64_t blockingTime = pReq->blockingTime; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b90ce275d2..19a3874f75 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -95,17 +95,17 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; // traverse to the streamscan node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; - while(pInfo->operatorType != OP_StreamScan) { + while (pInfo->operatorType != OP_StreamScan) { pInfo = pInfo->pDownstream[0]; } SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { - int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList); + int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -114,4 +114,4 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA } return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4b1f0ba306..4cc7040119 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -162,8 +162,8 @@ int32_t walEndSnapshot(SWal *pWal) { } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) - || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { + if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { // delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -279,6 +279,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in } else { // reject skip log or rewrite log // must truncate explicitly first + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } /*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/ @@ -303,16 +304,18 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } code = walWriteIndex(pWal, index, offset); @@ -329,7 +332,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in pthread_mutex_unlock(&pWal->mutex); - return code; + return 0; } void walFsync(SWal *pWal, bool forceFsync) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c8cafb93dd..f587c988ad 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" @@ -407,6 +409,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")