Merge pull request #27005 from taosdata/opti/TS-4593-3.0
opti:[TS-4593] transform offset from file to tdb in tmq
This commit is contained in:
commit
7e41e8885d
|
@ -511,7 +511,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
|
if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
@ -953,7 +953,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_val_compare_exchange_8(&pollFlag, 1, 0);
|
(void)atomic_val_compare_exchange_8(&pollFlag, 1, 0);
|
||||||
OVER:
|
OVER:
|
||||||
tDestroySMqHbReq(&req);
|
tDestroySMqHbReq(&req);
|
||||||
if(tmrId != NULL){
|
if(tmrId != NULL){
|
||||||
|
@ -2394,7 +2394,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_val_compare_exchange_8(&pollFlag, 0, 1);
|
(void)atomic_val_compare_exchange_8(&pollFlag, 0, 1);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
|
@ -3133,7 +3133,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
code = tEncodeMqVgOffset(&encoder, &pOffset);
|
code = tEncodeMqVgOffset(&encoder, &pOffset);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -463,7 +463,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
code = tEncodeMqVgOffset(&encoder, &vgOffset);
|
code = tEncodeMqVgOffset(&encoder, &vgOffset);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
|
@ -106,7 +106,7 @@ int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
|
tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -201,7 +201,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
|
|
@ -595,7 +595,7 @@ int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, i
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
|
tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
len += sizeof(SSubmitReq2Msg);
|
len += sizeof(SSubmitReq2Msg);
|
||||||
|
|
||||||
pBuf = rpcMallocCont(len);
|
pBuf = rpcMallocCont(len);
|
||||||
|
@ -1230,7 +1230,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
|
Loading…
Reference in New Issue