opti:[TS-4593] transform offset from file to tdb in tmq
This commit is contained in:
parent
e78a75d183
commit
1e1612a5fa
|
@ -655,7 +655,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
taosRLockLatch(&pTq->lock);
|
taosRLockLatch(&pTq->lock);
|
||||||
STqHandle* pHandle = NULL;
|
STqHandle* pHandle = NULL;
|
||||||
ret = tqMetaGetHandle(pTq, req.subKey, &pHandle);
|
(void)tqMetaGetHandle(pTq, req.subKey, &pHandle); //ignore return code
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
|
|
|
@ -391,7 +391,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
|
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
|
||||||
tdbFree(data);
|
tdbFree(data);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
STqHandle handle = {0};
|
STqHandle handle = {0};
|
||||||
if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){
|
if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){
|
||||||
|
@ -404,7 +404,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
|
||||||
tqDestroyTqHandle(&handle);
|
tqDestroyTqHandle(&handle);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
*pHandle = taosHashGet(pTq->pCheckInfo, key, strlen(key));
|
*pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
|
||||||
if(*pHandle == NULL){
|
if(*pHandle == NULL){
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue