Merge pull request #21311 from taosdata/mark/tmq
fix:add lock to pHandle for safety
This commit is contained in:
commit
d457b84aba
|
@ -208,8 +208,7 @@ SWalRef *walRefCommittedVer(SWal *);
|
||||||
|
|
||||||
SWalRef *walOpenRef(SWal *);
|
SWalRef *walOpenRef(SWal *);
|
||||||
void walCloseRef(SWal *pWal, int64_t refId);
|
void walCloseRef(SWal *pWal, int64_t refId);
|
||||||
int32_t walRefVer(SWalRef *, int64_t ver);
|
int32_t walSetRefVer(SWalRef *, int64_t ver);
|
||||||
void walUnrefVer(SWalRef *);
|
|
||||||
|
|
||||||
// helper function for raft
|
// helper function for raft
|
||||||
bool walLogExist(SWal *, int64_t ver);
|
bool walLogExist(SWal *, int64_t ver);
|
||||||
|
|
|
@ -46,23 +46,23 @@ typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
||||||
// tqPush
|
// tqPush
|
||||||
|
|
||||||
typedef struct {
|
//typedef struct {
|
||||||
// msg info
|
// // msg info
|
||||||
int64_t consumerId;
|
// int64_t consumerId;
|
||||||
int64_t reqOffset;
|
// int64_t reqOffset;
|
||||||
int64_t processedVer;
|
// int64_t processedVer;
|
||||||
int32_t epoch;
|
// int32_t epoch;
|
||||||
// rpc info
|
// // rpc info
|
||||||
int64_t reqId;
|
// int64_t reqId;
|
||||||
SRpcHandleInfo rpcInfo;
|
// SRpcHandleInfo rpcInfo;
|
||||||
tmr_h timerId;
|
// tmr_h timerId;
|
||||||
int8_t tmrStopped;
|
// int8_t tmrStopped;
|
||||||
// exec
|
// // exec
|
||||||
int8_t inputStatus;
|
// int8_t inputStatus;
|
||||||
int8_t execStatus;
|
// int8_t execStatus;
|
||||||
SStreamQueue inputQ;
|
// SStreamQueue inputQ;
|
||||||
SRWLatch lock;
|
// SRWLatch lock;
|
||||||
} STqPushHandle;
|
//} STqPushHandle;
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
|
|
||||||
|
@ -90,6 +90,11 @@ typedef struct {
|
||||||
int32_t numOfCols; // number of out pout column, temporarily used
|
int32_t numOfCols; // number of out pout column, temporarily used
|
||||||
} STqExecHandle;
|
} STqExecHandle;
|
||||||
|
|
||||||
|
typedef enum tq_handle_status{
|
||||||
|
TMQ_HANDLE_STATUS_IDLE = 0,
|
||||||
|
TMQ_HANDLE_STATUS_EXEC = 1,
|
||||||
|
}tq_handle_status;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
|
@ -98,18 +103,18 @@ typedef struct {
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
SWalReader* pWalReader;
|
SWalReader* pWalReader;
|
||||||
SWalRef* pRef;
|
SWalRef* pRef;
|
||||||
STqPushHandle pushHandle; // push
|
// STqPushHandle pushHandle; // push
|
||||||
STqExecHandle execHandle; // exec
|
STqExecHandle execHandle; // exec
|
||||||
SRpcMsg* msg;
|
SRpcMsg* msg;
|
||||||
int32_t noDataPollCnt;
|
int32_t noDataPollCnt;
|
||||||
int8_t exec;
|
tq_handle_status status;
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
typedef struct {
|
//typedef struct {
|
||||||
SMqDataRsp* pDataRsp;
|
// SMqDataRsp* pDataRsp;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
// char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRpcHandleInfo info;
|
// SRpcHandleInfo info;
|
||||||
} STqPushEntry;
|
//} STqPushEntry;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
|
@ -185,7 +190,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
bool tqIsHandleExecuting(STqHandle* pHandle);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,10 @@
|
||||||
|
|
||||||
static int32_t tqInitialize(STQ* pTq);
|
static int32_t tqInitialize(STQ* pTq);
|
||||||
|
|
||||||
|
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
|
||||||
|
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;}
|
||||||
|
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;}
|
||||||
|
|
||||||
int32_t tqInit() {
|
int32_t tqInit() {
|
||||||
int8_t old;
|
int8_t old;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -61,6 +65,7 @@ void tqCleanUp() {
|
||||||
static void destroyTqHandle(void* data) {
|
static void destroyTqHandle(void* data) {
|
||||||
STqHandle* pData = (STqHandle*)data;
|
STqHandle* pData = (STqHandle*)data;
|
||||||
qDestroyTask(pData->execHandle.task);
|
qDestroyTask(pData->execHandle.task);
|
||||||
|
|
||||||
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
|
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
|
||||||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
@ -292,10 +297,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||||
if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
|
if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -340,41 +348,48 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
STqOffsetVal reqOffset = req.reqOffset;
|
STqOffsetVal reqOffset = req.reqOffset;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
// 1. find handle
|
// 1. find handle
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (tqIsHandleExec(pHandle)) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey);
|
||||||
|
taosMsleep(5);
|
||||||
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check re-balance status
|
||||||
taosRLockLatch(&pTq->lock);
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pTq->lock);
|
tqSetHandleExec(pHandle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// 3. update the epoch value
|
// 3. update the epoch value
|
||||||
taosWLockLatch(&pTq->lock);
|
|
||||||
int32_t savedEpoch = pHandle->epoch;
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
if (savedEpoch < reqEpoch) {
|
if (savedEpoch < reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
||||||
reqEpoch);
|
reqEpoch);
|
||||||
pHandle->epoch = reqEpoch;
|
pHandle->epoch = reqEpoch;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->lock);
|
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
return tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
|
tqSetHandleIdle(pHandle);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
|
@ -384,18 +399,16 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
// walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId);
|
while (tqIsHandleExec(pHandle)) {
|
||||||
if (pHandle->pRef) {
|
|
||||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (tqIsHandleExecuting(pHandle)) {
|
|
||||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||||
taosMsleep(5);
|
taosMsleep(5);
|
||||||
}
|
}
|
||||||
|
if (pHandle->pRef) {
|
||||||
|
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||||
|
}
|
||||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
|
@ -410,6 +423,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,6 +471,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
|
@ -507,7 +523,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
(SSnapContext**)(&handle.sContext));
|
(SSnapContext**)(&handle.sContext));
|
||||||
|
|
||||||
|
@ -538,6 +554,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
goto end;
|
goto end;
|
||||||
} else {
|
} else {
|
||||||
|
while (tqIsHandleExec(pHandle)) {
|
||||||
|
tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
||||||
|
taosMsleep(5);
|
||||||
|
}
|
||||||
|
|
||||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
@ -553,22 +574,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (pTaskInfo != NULL) {
|
if (pTaskInfo != NULL) {
|
||||||
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
|
||||||
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
qStreamCloseTsdbReader(pTaskInfo);
|
qStreamCloseTsdbReader(pTaskInfo);
|
||||||
}
|
}
|
||||||
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
taosWUnLockLatch(&pTq->lock);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
@ -295,7 +295,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
code = -1;
|
code = -1;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
walRefVer(handle.pRef, handle.snapshotVer);
|
walSetRefVer(handle.pRef, handle.snapshotVer);
|
||||||
|
|
||||||
SReadHandle reader = {
|
SReadHandle reader = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
|
@ -352,7 +352,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
|
|
@ -78,13 +78,15 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
|
||||||
|
|
||||||
// todo remove this
|
// todo remove this
|
||||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
|
taosWLockLatch(&pStore->pTq->lock);
|
||||||
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||||
// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
|
// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
|
||||||
// offset.val.version);
|
// offset.val.version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pStore->pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMemBuf);
|
taosMemoryFree(pMemBuf);
|
||||||
|
|
|
@ -1016,6 +1016,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
// update the table list for each consumer handle
|
// update the table list for each consumer handle
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pHandle, pIter);
|
pIter = taosHashIterate(pTq->pHandle, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -1072,6 +1073,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
|
@ -162,8 +162,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
|
|
||||||
|
|
||||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
@ -173,12 +171,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
while(tqIsHandleExecuting(pHandle)){
|
|
||||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
|
||||||
taosMsleep(5);
|
|
||||||
}
|
|
||||||
atomic_store_8(&pHandle->exec, 1);
|
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
|
@ -195,7 +187,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
atomic_store_8(&pHandle->exec, 0);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
|
@ -214,7 +205,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
}
|
}
|
||||||
atomic_store_8(&pHandle->exec, 0);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -228,13 +218,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
|
|
||||||
while(tqIsHandleExecuting(pHandle)){
|
|
||||||
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
|
|
||||||
taosMsleep(5);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pHandle->exec, 1);
|
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -329,7 +312,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
atomic_store_8(&pHandle->exec, 0);
|
|
||||||
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
|
|
|
@ -1060,7 +1060,10 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
|
||||||
|
|
||||||
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
|
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
|
||||||
// if offset version is small than first version , let's seek to first version
|
// if offset version is small than first version , let's seek to first version
|
||||||
|
taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex);
|
||||||
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
|
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
|
||||||
|
taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex);
|
||||||
|
|
||||||
if (pOffset->version + 1 < firstVer){
|
if (pOffset->version + 1 < firstVer){
|
||||||
pOffset->version = firstVer - 1;
|
pOffset->version = firstVer - 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,7 +247,9 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
walReadSeekVerImpl(pRead, fetchVer);
|
if(walReadSeekVerImpl(pRead, fetchVer) < 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -354,7 +356,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
walReadSeekVerImpl(pRead, ver);
|
if(walReadSeekVerImpl(pRead, ver) < 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -488,7 +492,10 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
|
||||||
if (contLen == sizeof(SWalCkHead)) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
break;
|
break;
|
||||||
} else if (contLen == 0 && !seeked) {
|
} else if (contLen == 0 && !seeked) {
|
||||||
walReadSeekVerImpl(pReader, ver);
|
if(walReadSeekVerImpl(pReader, ver) < 0){
|
||||||
|
taosThreadMutexUnlock(&pReader->mutex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
seeked = true;
|
seeked = true;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -45,7 +45,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
|
||||||
taosMemoryFree(pRef);
|
taosMemoryFree(pRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
SWal *pWal = pRef->pWal;
|
SWal *pWal = pRef->pWal;
|
||||||
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
|
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
|
||||||
if (pRef->refVer != ver) {
|
if (pRef->refVer != ver) {
|
||||||
|
@ -57,26 +57,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
|
||||||
// SWalFileInfo tmpInfo;
|
|
||||||
// tmpInfo.firstVer = ver;
|
|
||||||
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
|
||||||
// ASSERT(pRet != NULL);
|
|
||||||
// pRef->refFile = pRet->firstVer;
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
|
||||||
void walUnrefVer(SWalRef *pRef) {
|
|
||||||
pRef->refId = -1;
|
|
||||||
// pRef->refFile = -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
|
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
pRef = walOpenRef(pWal);
|
pRef = walOpenRef(pWal);
|
||||||
|
@ -87,12 +73,6 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
int64_t ver = walGetFirstVer(pWal);
|
int64_t ver = walGetFirstVer(pWal);
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
|
||||||
// SWalFileInfo tmpInfo;
|
|
||||||
// tmpInfo.firstVer = ver;
|
|
||||||
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
|
||||||
// ASSERT(pRet != NULL);
|
|
||||||
// pRef->refFile = pRet->firstVer;
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
||||||
|
|
|
@ -145,29 +145,25 @@ class TMQCom:
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||||
tdLog.debug("%s is stopped by kill -INT" % (processorName))
|
tdLog.debug("%s is stopped by kill -INT" % (processorName))
|
||||||
|
|
||||||
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
|
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb',rows=1):
|
||||||
loopFlag = 1
|
loopFlag = 1
|
||||||
while loopFlag:
|
while loopFlag:
|
||||||
tdSql.query("select * from %s.notifyinfo"%cdbName)
|
tdSql.query("select * from %s.notifyinfo where cmdid = 0"%cdbName)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
|
||||||
actRows = tdSql.getRows()
|
actRows = tdSql.getRows()
|
||||||
for i in range(actRows):
|
tdLog.info("row: %d"%(actRows))
|
||||||
if tdSql.getData(i, 1) == 0:
|
if (actRows >= rows):
|
||||||
loopFlag = 0
|
loopFlag = 0
|
||||||
break
|
|
||||||
time.sleep(0.02)
|
time.sleep(0.02)
|
||||||
return
|
return
|
||||||
|
|
||||||
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
|
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=1):
|
||||||
loopFlag = 1
|
loopFlag = 1
|
||||||
while loopFlag:
|
while loopFlag:
|
||||||
tdSql.query("select * from %s.notifyinfo"%cdbName)
|
tdSql.query("select * from %s.notifyinfo where cmdid = 1"%cdbName)
|
||||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
|
||||||
actRows = tdSql.getRows()
|
actRows = tdSql.getRows()
|
||||||
for i in range(actRows):
|
tdLog.info("row: %d"%(actRows))
|
||||||
if tdSql.getData(i, 1) == 1:
|
if (actRows >= rows):
|
||||||
loopFlag = 0
|
loopFlag = 0
|
||||||
break
|
|
||||||
time.sleep(0.02)
|
time.sleep(0.02)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.info("wait consumer commit notify")
|
tdLog.info("wait consumer commit notify")
|
||||||
# tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
|
# tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
|
||||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
tmqCom.getStartConsumeNotifyFromTmqsim(rows=2)
|
||||||
|
|
||||||
tdLog.info("pkill one consume processor")
|
tdLog.info("pkill one consume processor")
|
||||||
tmqCom.stopTmqSimProcess('tmq_sim_new')
|
tmqCom.stopTmqSimProcess('tmq_sim_new')
|
||||||
|
|
Loading…
Reference in New Issue