feat: compatible with older versions of wal
This commit is contained in:
parent
aeb6bf92d1
commit
dc984d92f7
|
@ -3307,6 +3307,12 @@ typedef struct {
|
|||
SArray* aSubmitTbData; // SArray<SSubmitTbData>
|
||||
} SSubmitReq2;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
int64_t version;
|
||||
char data[]; // SSubmitReq2
|
||||
} SSubmitReq2Msg;
|
||||
|
||||
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
|
||||
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
|
||||
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
|
||||
|
@ -3323,6 +3329,7 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
|
|||
|
||||
#define TSDB_MSG_FLG_ENCODE 0x1
|
||||
#define TSDB_MSG_FLG_DECODE 0x2
|
||||
#define TSDB_MSG_FLG_CMPT 0x3
|
||||
|
||||
typedef struct {
|
||||
union {
|
||||
|
|
|
@ -6995,9 +6995,13 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (flag == TSDB_MSG_FLG_ENCODE) {
|
||||
if (flag == TSDB_MSG_FLG_ENCODE || flag == TSDB_MSG_FLG_CMPT) {
|
||||
if (pTbData->pCreateTbReq) {
|
||||
if (flag == TSDB_MSG_FLG_ENCODE) {
|
||||
tdDestroySVCreateTbReq(pTbData->pCreateTbReq);
|
||||
} else {
|
||||
tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
|
||||
}
|
||||
taosMemoryFree(pTbData->pCreateTbReq);
|
||||
}
|
||||
|
||||
|
|
|
@ -683,13 +683,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SPackedData submit = {
|
||||
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)),
|
||||
.msgLen = pHead->bodyLen - sizeof(SMsgHead),
|
||||
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
|
||||
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
||||
.ver = pHead->version,
|
||||
};
|
||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||
TD_VID(pTq->pVnode), req.subKey);
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
return -1;
|
||||
}
|
||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||
|
|
|
@ -79,7 +79,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
case TDMT_VND_SUBMIT: {
|
||||
int64_t ctime = taosGetTimestampMs();
|
||||
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
|
||||
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
|
||||
tStartDecode(&dc);
|
||||
|
||||
uint64_t nSubmitTbData;
|
||||
|
@ -256,7 +256,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
break;
|
||||
/* TSDB */
|
||||
case TDMT_VND_SUBMIT:
|
||||
if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DELETE:
|
||||
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
|
@ -874,6 +874,186 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SSubmitReqConvertCxt {
|
||||
SSubmitMsgIter msgIter;
|
||||
SSubmitBlk *pBlock;
|
||||
SSubmitBlkIter blkIter;
|
||||
STSRow *pRow;
|
||||
STSRowIter rowIter;
|
||||
SSubmitTbData *pTbData;
|
||||
STSchema *pTbSchema;
|
||||
SArray *pColValues;
|
||||
} SSubmitReqConvertCxt;
|
||||
|
||||
static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
|
||||
taosMemoryFreeClear(pCxt->pTbSchema);
|
||||
pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid, pCxt->msgIter.sversion, 1);
|
||||
if (NULL == pCxt->pTbSchema) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);
|
||||
|
||||
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
|
||||
if (NULL == pCxt->pTbData) {
|
||||
pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||
if (NULL == pCxt->pTbData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
pCxt->pTbData->flags = 0;
|
||||
pCxt->pTbData->suid = pCxt->msgIter.suid;
|
||||
pCxt->pTbData->uid = pCxt->msgIter.uid;
|
||||
pCxt->pTbData->sver = pCxt->msgIter.sversion;
|
||||
pCxt->pTbData->pCreateTbReq = NULL;
|
||||
pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
|
||||
if (NULL == pCxt->pTbData->aRowP) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pCxt->pColValues);
|
||||
pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
|
||||
if (NULL == pCxt->pColValues) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
|
||||
SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
|
||||
taosArrayPush(pCxt->pColValues, &val);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
|
||||
taosMemoryFreeClear(pCxt->pTbSchema);
|
||||
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
|
||||
taosMemoryFreeClear(pCxt->pTbData);
|
||||
taosArrayDestroy(pCxt->pColValues);
|
||||
}
|
||||
|
||||
static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
|
||||
if (tdValTypeIsNone(pCellVal->valType)) {
|
||||
pColVal->flag = CV_FLAG_NONE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tdValTypeIsNull(pCellVal->valType)) {
|
||||
pColVal->flag = CV_FLAG_NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pColVal->value.nData = varDataLen(pCellVal->val);
|
||||
pColVal->value.pData = varDataVal(pCellVal->val);
|
||||
} else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
|
||||
float f = GET_FLOAT_VAL(pCellVal->val);
|
||||
memcpy(&pColVal->value.val, &f, sizeof(f));
|
||||
} else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
|
||||
pColVal->value.val = *(int64_t *)pCellVal->val;
|
||||
} else {
|
||||
GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
|
||||
}
|
||||
|
||||
pColVal->flag = CV_FLAG_VALUE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
|
||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
|
||||
STColumn *pCol = pCxt->pTbSchema->columns + i;
|
||||
SCellVal cellVal = {0};
|
||||
if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
|
||||
break;
|
||||
}
|
||||
code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
|
||||
if (pCxt->msgIter.schemaLen <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pCxt->pTbData->pCreateTbReq) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, pCxt->pBlock->data, pCxt->msgIter.schemaLen);
|
||||
int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
|
||||
pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
|
||||
if (NULL == pReq2->aSubmitTbData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSubmitReqConvertCxt cxt = {0};
|
||||
|
||||
int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
|
||||
while (TSDB_CODE_SUCCESS == code) {
|
||||
code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == cxt.pBlock) {
|
||||
break;
|
||||
}
|
||||
code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = vnodeDecodeCreateTbReq(&cxt);
|
||||
}
|
||||
while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
|
||||
code = vnodeTSRowConvertToColValArray(&cxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
|
||||
code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosMemoryFreeClear(cxt.pTbData);
|
||||
}
|
||||
}
|
||||
|
||||
vnodeDestroySubmitReqConvertCxt(&cxt);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
char *pMsg = NULL;
|
||||
uint32_t msglen = 0;
|
||||
tEncodeSize(tEncodeSSubmitReq2, pSubmitReq, msglen, code);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pMsg = taosMemoryMalloc(msglen);
|
||||
if (NULL == pMsg) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, pMsg, msglen);
|
||||
code = tEncodeSSubmitReq2(&encoder, pSubmitReq);
|
||||
tEncoderClear(&encoder);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*ppMsg = pMsg;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
#if 1
|
||||
int32_t code = 0;
|
||||
|
@ -887,7 +1067,19 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
|
||||
if (0 == pMsg->version) {
|
||||
code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _exit;
|
||||
}
|
||||
} else {
|
||||
// decode
|
||||
pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
|
||||
len -= sizeof(SSubmitReq2Msg);
|
||||
SDecoder dc = {0};
|
||||
tDecoderInit(&dc, pReq, len);
|
||||
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
|
||||
|
@ -895,6 +1087,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
goto _exit;
|
||||
}
|
||||
tDecoderClear(&dc);
|
||||
}
|
||||
|
||||
// check
|
||||
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
|
||||
|
@ -1040,11 +1233,15 @@ _exit:
|
|||
|
||||
// clear
|
||||
taosArrayDestroy(newTbUids);
|
||||
tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE);
|
||||
tDestroySSubmitReq2(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
|
||||
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
if (code) terrno = code;
|
||||
|
||||
if (0 == pMsg->version) {
|
||||
taosMemoryFree(pReq);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
#else
|
||||
|
|
|
@ -501,14 +501,15 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
|
|||
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SEncoder encoder;
|
||||
len += sizeof(SMsgHead);
|
||||
len += sizeof(SSubmitReq2Msg);
|
||||
pBuf = taosMemoryMalloc(len);
|
||||
if (NULL == pBuf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SMsgHead*)pBuf)->vgId = htonl(vgId);
|
||||
((SMsgHead*)pBuf)->contLen = htonl(len);
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
|
||||
((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
|
||||
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
|
||||
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
|
||||
code = tEncodeSSubmitReq2(&encoder, pReq);
|
||||
tEncoderClear(&encoder);
|
||||
}
|
||||
|
@ -679,13 +680,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
pStart += BitmapLen(numOfRows);
|
||||
}
|
||||
char* pData = pStart;
|
||||
// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c]));
|
||||
|
||||
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
if(needChangeLength) {
|
||||
if (needChangeLength) {
|
||||
pStart += htonl(colLength[c]);
|
||||
}else{
|
||||
} else {
|
||||
pStart += colLength[c];
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue