enh: add table blokc order check
This commit is contained in:
parent
39628ec6f5
commit
b93b7aa550
|
@ -153,6 +153,8 @@ typedef struct STableDataCxt {
|
||||||
SBoundColInfo boundColsInfo;
|
SBoundColInfo boundColsInfo;
|
||||||
SArray *pValues;
|
SArray *pValues;
|
||||||
SSubmitTbData *pData;
|
SSubmitTbData *pData;
|
||||||
|
TSKEY lastTs;
|
||||||
|
bool ordered;
|
||||||
} STableDataCxt;
|
} STableDataCxt;
|
||||||
|
|
||||||
typedef struct SVgroupDataCxt {
|
typedef struct SVgroupDataCxt {
|
||||||
|
@ -161,6 +163,7 @@ typedef struct SVgroupDataCxt {
|
||||||
} SVgroupDataCxt;
|
} SVgroupDataCxt;
|
||||||
|
|
||||||
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
||||||
|
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
|
||||||
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
||||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
|
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
|
||||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
||||||
|
|
|
@ -1175,6 +1175,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
|
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
|
||||||
|
|
|
@ -981,6 +981,20 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
|
||||||
|
// once the data block is disordered, we do NOT keep previous timestamp any more
|
||||||
|
if (!pTableCxt->ordered) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsKey <= pTableCxt->lastTs) {
|
||||||
|
pTableCxt->ordered = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableCxt->lastTs = tsKey;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
|
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
|
||||||
|
|
||||||
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
|
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
|
||||||
|
@ -991,6 +1005,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
pTableCxt->ordered = true;
|
||||||
|
pTableCxt->lastTs = 0;
|
||||||
|
|
||||||
pTableCxt->pMeta = tableMetaDup(pTableMeta);
|
pTableCxt->pMeta = tableMetaDup(pTableMeta);
|
||||||
if (NULL == pTableCxt->pMeta) {
|
if (NULL == pTableCxt->pMeta) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1179,7 +1196,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
||||||
void* p = taosHashIterate(pTableHash, NULL);
|
void* p = taosHashIterate(pTableHash, NULL);
|
||||||
while (TSDB_CODE_SUCCESS == code && NULL != p) {
|
while (TSDB_CODE_SUCCESS == code && NULL != p) {
|
||||||
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
|
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
|
||||||
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
|
if (pTableCxt->ordered) {
|
||||||
|
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
SVgroupDataCxt* pVgCxt = NULL;
|
SVgroupDataCxt* pVgCxt = NULL;
|
||||||
int32_t vgId = pTableCxt->pMeta->vgId;
|
int32_t vgId = pTableCxt->pMeta->vgId;
|
||||||
|
|
Loading…
Reference in New Issue