fix
This commit is contained in:
parent
a7533ac2b2
commit
b7788aca04
|
@ -66,6 +66,7 @@ struct tmq_t {
|
|||
STscObj* pTscObj;
|
||||
tmq_commit_cb* commit_cb;
|
||||
int32_t nextTopicIdx;
|
||||
int32_t waitingRequest;
|
||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||
STaosQueue* mqueue; // queue of tmq_message_t
|
||||
STaosQall* qall;
|
||||
|
@ -209,12 +210,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
if (pParam->tmq->commit_cb) {
|
||||
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
|
||||
}
|
||||
if (!pParam->async)
|
||||
tsem_post(&pParam->rspSem);
|
||||
else {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(param);
|
||||
}
|
||||
if (!pParam->async) tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -243,6 +239,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
pTmq->status = 0;
|
||||
pTmq->pollCnt = 0;
|
||||
pTmq->epoch = 0;
|
||||
pTmq->waitingRequest = 0;
|
||||
// set conf
|
||||
strcpy(pTmq->clientId, conf->clientId);
|
||||
strcpy(pTmq->groupId, conf->groupId);
|
||||
|
@ -315,6 +312,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
|||
}
|
||||
pParam->tmq = tmq;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
pParam->async = async;
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = buf,
|
||||
|
@ -335,6 +333,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
|||
resp = pParam->rspErr;
|
||||
}
|
||||
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(pParam);
|
||||
|
||||
if (pArray) {
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
@ -576,7 +577,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return 0;
|
||||
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
|
||||
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
|
||||
return pRsp->skipLogNum;
|
||||
}
|
||||
|
||||
|
@ -625,13 +626,14 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
|
|||
}
|
||||
|
||||
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
printf("recv poll\n");
|
||||
/*printf("recv poll\n");*/
|
||||
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
|
||||
SMqClientVg* pVg = pParam->pVg;
|
||||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("msg discard\n");
|
||||
if (pParam->epoch == tmq->epoch) {
|
||||
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
return 0;
|
||||
|
@ -646,6 +648,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
if (msgEpoch != tmqEpoch) {
|
||||
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
|
||||
} else {
|
||||
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
|
||||
}
|
||||
|
||||
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
|
||||
|
@ -658,7 +662,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
|
||||
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||
if (pRsp->consumeRsp.numOfTopics == 0) {
|
||||
printf("no data\n");
|
||||
/*printf("no data\n");*/
|
||||
if (pParam->epoch == tmq->epoch) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
|
@ -667,7 +671,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
pRsp->extra = pParam->pVg;
|
||||
taosWriteQitem(tmq->mqueue, pRsp);
|
||||
printf("poll in queue\n");
|
||||
|
||||
/*printf("poll in queue\n");*/
|
||||
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
|
||||
/*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/
|
||||
|
||||
|
@ -743,7 +748,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||
printf("ask ep sync %d\n", sync);
|
||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||
SMqCMGetSubEpReq* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
|
@ -812,7 +816,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
|||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
int64_t reqOffset;
|
||||
if (pVg->currentOffset >= 0) {
|
||||
reqOffset = pVg->currentOffset;
|
||||
|
@ -832,7 +836,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
|
|||
strcpy(pReq->topic, pTopic->topicName);
|
||||
strcpy(pReq->cgroup, tmq->groupId);
|
||||
|
||||
pReq->blockingTime = blocking_time;
|
||||
pReq->blockingTime = blockingTime;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->epoch = tmq->epoch;
|
||||
pReq->currentOffset = reqOffset;
|
||||
|
@ -863,7 +867,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|||
}
|
||||
|
||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||
printf("call poll\n");
|
||||
/*printf("call poll\n");*/
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
|
@ -900,7 +904,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
sendInfo->fp = tmqPollCb;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
printf("send poll\n");
|
||||
/*printf("send poll\n");*/
|
||||
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
pVg->pollCnt++;
|
||||
tmq->pollCnt++;
|
||||
|
@ -912,7 +917,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
// return
|
||||
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
|
||||
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
|
||||
printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);
|
||||
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
||||
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) {
|
||||
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp);
|
||||
tmqClearUnhandleMsg(tmq);
|
||||
|
@ -935,9 +940,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
}
|
||||
|
||||
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);
|
||||
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
|
||||
if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) {
|
||||
printf("epoch match\n");
|
||||
/*printf("epoch match\n");*/
|
||||
SMqClientVg* pVg = rspMsg->extra;
|
||||
pVg->currentOffset = rspMsg->consumeRsp.rspOffset;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
@ -947,7 +952,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
taosFreeQitem(rspMsg);
|
||||
}
|
||||
} else {
|
||||
printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);
|
||||
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
|
||||
bool reset = false;
|
||||
tmqHandleRes(tmq, rspMsg, &reset);
|
||||
taosFreeQitem(rspMsg);
|
||||
|
@ -972,12 +977,16 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
if (rspMsg == NULL) {
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
}
|
||||
tmqHandleAllRsp(tmq, blocking_time, false);
|
||||
|
||||
tmqPollImpl(tmq, blocking_time);
|
||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||
if (rspMsg) {
|
||||
return rspMsg;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
/*printf("cycle\n");*/
|
||||
if (atomic_load_32(&tmq->waitingRequest) == 0) {
|
||||
tmqPollImpl(tmq, blocking_time);
|
||||
}
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
|
||||
if (rspMsg) {
|
||||
|
@ -986,7 +995,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
if (blocking_time != 0) {
|
||||
int64_t endTime = taosGetTimestampMs();
|
||||
if (endTime - startTime > blocking_time) {
|
||||
printf("normal exit\n");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,6 +214,10 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
|
|||
//}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
}
|
||||
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -228,6 +232,23 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||
|
|
|
@ -72,6 +72,8 @@ void tqClose(STQ* pTq) {
|
|||
}
|
||||
|
||||
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
|
||||
// if waiting
|
||||
// memcpy and send msg to fetch thread
|
||||
// TODO: add reference
|
||||
// if handle waiting, launch query and response to consumer
|
||||
//
|
||||
|
@ -210,7 +212,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SMqConsumeReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t fetchOffset;
|
||||
/*int64_t blockingTime = pReq->blockingTime;*/
|
||||
int64_t blockingTime = pReq->blockingTime;
|
||||
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
fetchOffset = 0;
|
||||
|
|
|
@ -105,7 +105,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
|
|||
|
||||
SStreamBlockScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) {
|
||||
int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList);
|
||||
int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -162,8 +162,8 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
}
|
||||
// iterate files, until the searched result
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize)
|
||||
|| (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) ||
|
||||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||
// delete according to file size or close time
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
|
@ -279,6 +279,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
} else {
|
||||
// reject skip log or rewrite log
|
||||
// must truncate explicitly first
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
return -1;
|
||||
}
|
||||
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
|
||||
|
@ -303,16 +304,18 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
|
||||
// ftruncate
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
|
||||
// ftruncate
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
code = walWriteIndex(pWal, index, offset);
|
||||
|
@ -329,7 +332,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void walFsync(SWal *pWal, bool forceFsync) {
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
|
@ -407,6 +409,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
|
||||
|
||||
// tfs
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
||||
|
|
Loading…
Reference in New Issue