opti:[TS-4593] transform offset from file to tdb in tmq
This commit is contained in:
parent
591c1acce1
commit
3c6ba2eca1
|
@ -511,7 +511,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
|
if(tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
@ -3133,7 +3133,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
code = tEncodeMqVgOffset(&encoder, &pOffset);
|
code = tEncodeMqVgOffset(&encoder, &pOffset);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -459,7 +459,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
code = tEncodeMqVgOffset(&encoder, &vgOffset);
|
code = tEncodeMqVgOffset(&encoder, &vgOffset);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
|
@ -106,7 +106,7 @@ int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
|
tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -201,7 +201,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
int32_t vlen;
|
int32_t vlen;
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
|
|
@ -585,7 +585,7 @@ int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, i
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
|
tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
len += sizeof(SSubmitReq2Msg);
|
len += sizeof(SSubmitReq2Msg);
|
||||||
|
|
||||||
pBuf = rpcMallocCont(len);
|
pBuf = rpcMallocCont(len);
|
||||||
|
@ -1196,7 +1196,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder = {0};
|
||||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
|
Loading…
Reference in New Issue