enh: replace row format

This commit is contained in:
Xiaoyu Wang 2022-11-28 00:08:41 +08:00
parent 0c137f3a79
commit faf156105d
5 changed files with 77 additions and 34 deletions

View File

@ -3250,6 +3250,7 @@ typedef struct {
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitReq2(SSubmitReq2* pReq); void tDestroySSubmitReq2(SSubmitReq2* pReq);
#pragma pack(pop) #pragma pack(pop)

View File

@ -6830,6 +6830,10 @@ _exit:
return 0; return 0;
} }
void tDestroySSubmitTbData(SSubmitTbData *pTbData) {
// todo
}
void tDestroySSubmitReq2(SSubmitReq2 *pReq) { void tDestroySSubmitReq2(SSubmitReq2 *pReq) {
if (NULL == pReq) return; if (NULL == pReq) return;

View File

@ -153,12 +153,12 @@ typedef struct STableDataCxt {
SBoundColInfo boundColsInfo; SBoundColInfo boundColsInfo;
SArray *pValues; SArray *pValues;
SVCreateTbReq *pCreateTblReq; SVCreateTbReq *pCreateTblReq;
SSubmitTbData data; SSubmitTbData *pData;
} STableDataCxt; } STableDataCxt;
typedef struct SVgroupDataCxt { typedef struct SVgroupDataCxt {
int32_t vgId; int32_t vgId;
SSubmitReq2 data; SSubmitReq2 *pData;
} SVgroupDataCxt; } SVgroupDataCxt;
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);

View File

@ -1100,6 +1100,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pVal->flag = CV_FLAG_VALUE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1167,7 +1168,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SRow** pRow = taosArrayReserve(pTableCxt->data.aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
} }

View File

@ -1014,9 +1014,17 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pTableCxt->data.aRowP = taosArrayInit(128, POINTER_BYTES); pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pTableCxt->data.aRowP) { if (NULL == pTableCxt->pData) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pTableCxt->pData->suid = pTableMeta->suid;
pTableCxt->pData->uid = pTableMeta->uid;
pTableCxt->pData->sver = pTableMeta->sversion;
pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
if (NULL == pTableCxt->pData->aRowP) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
@ -1055,7 +1063,7 @@ void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/); taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/);
tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq); tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq);
taosMemoryFreeClear(pTableCxt->pCreateTblReq); taosMemoryFreeClear(pTableCxt->pCreateTblReq);
// todo free SSubmitTbData tDestroySSubmitTbData(pTableCxt->pData);
} }
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) { void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
@ -1063,7 +1071,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
return; return;
} }
tDestroySSubmitReq2(&pVgCxt->data); tDestroySSubmitReq2(pVgCxt->pData);
} }
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) { void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
@ -1112,26 +1120,50 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) { static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) {
if (NULL != pTableCxt->pCreateTblReq) { if (NULL != pTableCxt->pCreateTblReq) {
if (NULL == pVgCxt->data.aCreateTbReq) { if (NULL == pVgCxt->pData->aCreateTbReq) {
pVgCxt->data.aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq)); pVgCxt->pData->aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq));
if (NULL == pVgCxt->data.aCreateTbReq) { if (NULL == pVgCxt->pData->aCreateTbReq) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
taosArrayPush(pVgCxt->data.aCreateTbReq, pTableCxt->pCreateTblReq); taosArrayPush(pVgCxt->pData->aCreateTbReq, pTableCxt->pCreateTblReq);
} }
if (NULL == pVgCxt->data.aSubmitTbData) { if (NULL == pVgCxt->pData->aSubmitTbData) {
pVgCxt->data.aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
if (NULL == pVgCxt->data.aSubmitTbData) { if (NULL == pVgCxt->pData->aSubmitTbData) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
taosArrayPush(pVgCxt->data.aSubmitTbData, &pTableCxt->data); taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
pTableCxt->pData = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList,
SVgroupDataCxt** pOutput) {
SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
if (NULL == pVgCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
if (NULL == pVgCxt->pData) {
insDestroyVgroupDataCxt(pVgCxt);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgCxt->vgId = pTableCxt->pMeta->vgId;
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pVgroupList, &pVgCxt);
*pOutput = pVgCxt;
} else {
insDestroyVgroupDataCxt(pVgCxt);
}
return code;
}
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
@ -1146,17 +1178,14 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
void* p = taosHashIterate(pTableHash, NULL); void* p = taosHashIterate(pTableHash, NULL);
while (TSDB_CODE_SUCCESS == code && NULL != p) { while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p; STableDataCxt* pTableCxt = *(STableDataCxt**)p;
code = tRowMergeSort(pTableCxt->data.aRowP, pTableCxt->pSchema, 0); code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
int32_t vgId = pTableCxt->pMeta->vgId; int32_t vgId = pTableCxt->pMeta->vgId;
SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pVgCxt) { if (NULL == pVgCxt) {
SVgroupDataCxt vgCxt = {.vgId = vgId}; code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
code = fillVgroupDataCxt(pTableCxt, &vgCxt); }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pVgroupHash, &vgId, sizeof(vgId), &vgCxt, sizeof(vgCxt));
}
} else {
code = fillVgroupDataCxt(pTableCxt, pVgCxt); code = fillVgroupDataCxt(pTableCxt, pVgCxt);
} }
} }
@ -1175,22 +1204,30 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
return code; return code;
} }
static int32_t buildSubmitReq(SSubmitReq2* pReq, void** pData, uint32_t* pLen) { static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
tEncodeSize(tEncodeSSubmitReq2, pReq, *pLen, code); uint32_t len = 0;
void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
*pData = taosMemoryMalloc(*pLen); len += sizeof(SMsgHead);
if (NULL == *pData) { pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
tEncoderInit(&encoder, *pData, *pLen); ((SMsgHead*)pBuf)->vgId = htonl(vgId);
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFreeClear(*pData); if (TSDB_CODE_SUCCESS == code) {
*pLen = 0; *pData = pBuf;
*pLen = len;
} else {
taosMemoryFree(pBuf);
} }
return code; return code;
} }
@ -1210,11 +1247,11 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->data.aSubmitTbData); dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(&src->data, &dst->pData, &dst->size); code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS); code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);