Merge branch 'feat/TS-4243-3.0' of github.com:taosdata/TDengine into TEST/3.0/TS-4243
This commit is contained in:
commit
23cbf247e8
|
@ -101,16 +101,18 @@ typedef struct {
|
|||
int32_t kvRowSize;
|
||||
} SRowBuildScanInfo;
|
||||
|
||||
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
|
||||
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
|
||||
static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
|
||||
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
sinfo->numOfNone++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
|
||||
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
|
||||
static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
|
||||
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
sinfo->numOfNull++;
|
||||
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
|
||||
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) {
|
||||
|
@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal
|
|||
}
|
||||
|
||||
static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) {
|
||||
int32_t code = 0;
|
||||
int32_t colValIndex = 1;
|
||||
int32_t numOfColVals = TARRAY_SIZE(colVals);
|
||||
SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals);
|
||||
|
@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
|
|||
for (int32_t i = 1; i < schema->numOfCols; i++) {
|
||||
for (;;) {
|
||||
if (colValIndex >= numOfColVals) {
|
||||
tRowBuildScanAddNone(sinfo, schema->columns + i);
|
||||
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
|
|||
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
|
||||
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
|
||||
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
|
||||
tRowBuildScanAddNull(sinfo, schema->columns + i);
|
||||
if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit;
|
||||
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
|
||||
tRowBuildScanAddNone(sinfo, schema->columns + i);
|
||||
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
|
||||
}
|
||||
|
||||
colValIndex++;
|
||||
break;
|
||||
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
|
||||
tRowBuildScanAddNone(sinfo, schema->columns + i);
|
||||
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
|
||||
break;
|
||||
} else { // skip useless value
|
||||
colValIndex++;
|
||||
|
@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
|
|||
+ sinfo->kvIndexSize // index array
|
||||
+ sinfo->kvPayloadSize; // payload
|
||||
|
||||
return 0;
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema,
|
||||
|
|
|
@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
|
|||
return pLoadInfo;
|
||||
}
|
||||
|
||||
static void freeItem(void* pValue) {
|
||||
SValue* p = (SValue*) pValue;
|
||||
if (IS_VAR_DATA_TYPE(p->type)) {
|
||||
taosMemoryFree(p->pData);
|
||||
}
|
||||
}
|
||||
|
||||
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
if (pLoadInfo == NULL) {
|
||||
return NULL;
|
||||
|
@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
|||
|
||||
if (pLoadInfo->info.pCount != NULL) {
|
||||
taosArrayDestroy(pLoadInfo->info.pUid);
|
||||
taosArrayDestroy(pLoadInfo->info.pFirstKey);
|
||||
taosArrayDestroy(pLoadInfo->info.pLastKey);
|
||||
taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
|
||||
taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
|
||||
taosArrayDestroy(pLoadInfo->info.pCount);
|
||||
taosArrayDestroy(pLoadInfo->info.pFirstTs);
|
||||
taosArrayDestroy(pLoadInfo->info.pLastTs);
|
||||
|
@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tValueDupPayload(SValue *pVal) {
|
||||
if (IS_VAR_DATA_TYPE(pVal->type)) {
|
||||
char *p = (char *)pVal->pData;
|
||||
char *pBuf = taosMemoryMalloc(pVal->nData);
|
||||
if (pBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pBuf, p, pVal->nData);
|
||||
pVal->pData = (uint8_t *)pBuf;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
|
||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||
|
@ -384,25 +406,16 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
break;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(vFirst.type)) {
|
||||
char *p = (char *)vFirst.pData;
|
||||
char *pBuf = taosMemoryMalloc(vFirst.nData);
|
||||
memcpy(pBuf, p, vFirst.nData);
|
||||
vFirst.pData = (uint8_t *)pBuf;
|
||||
}
|
||||
tValueDupPayload(&vFirst);
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||
|
||||
// todo add api to clone the original data
|
||||
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(vLast.type)) {
|
||||
char *p = (char *)vLast.pData;
|
||||
char *pBuf = taosMemoryMalloc(vLast.nData);
|
||||
memcpy(pBuf, p, vLast.nData);
|
||||
vLast.pData = (uint8_t *)pBuf;
|
||||
}
|
||||
tValueDupPayload(&vLast);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
|
||||
}
|
||||
|
||||
|
@ -420,8 +433,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
|
||||
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]);
|
||||
SValue s = record.firstKey.pks[0];
|
||||
tValueDupPayload(&s);
|
||||
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
|
||||
|
||||
s = record.lastKey.pks[0];
|
||||
tValueDupPayload(&s);
|
||||
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
|
|||
int32_t rowIndex);
|
||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
|
||||
SVersionRange* pVerRange);
|
||||
SVersionRange* pVerRange, bool hasPk);
|
||||
|
||||
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
|
||||
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
|
||||
|
@ -1595,7 +1595,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
|||
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc);
|
||||
|
||||
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
|
||||
pSttBlockReader->numOfPks > 0)) {
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||
return true;
|
||||
}
|
||||
|
@ -2135,7 +2136,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
|||
|
||||
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
|
||||
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order,
|
||||
&pInfo->verRange);
|
||||
&pInfo->verRange, pReader->suppInfo.numOfPks > 0);
|
||||
if (dropped) {
|
||||
return false;
|
||||
}
|
||||
|
@ -3381,8 +3382,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
|
|||
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
|
||||
}
|
||||
|
||||
static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) {
|
||||
size_t num = taosArrayGetSize(pDelList);
|
||||
int32_t start = index;
|
||||
|
||||
if (asc) {
|
||||
if (start >= num - 1) {
|
||||
start = num - 1;
|
||||
}
|
||||
|
||||
TSDBKEY* p = taosArrayGet(pDelList, start);
|
||||
while (p->ts >= key && start > 0) {
|
||||
start -= 1;
|
||||
}
|
||||
} else {
|
||||
if (index <= 0) {
|
||||
start = 0;
|
||||
}
|
||||
|
||||
TSDBKEY* p = taosArrayGet(pDelList, start);
|
||||
while (p->ts <= key && start < num - 1) {
|
||||
start += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return start;
|
||||
}
|
||||
|
||||
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
|
||||
SVersionRange* pVerRange) {
|
||||
SVersionRange* pVerRange, bool hasPk) {
|
||||
if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -3391,6 +3419,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
|
|||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
if (hasPk) { // handle the case where duplicated timestamps existed.
|
||||
*index = reverseSearchStartPos(pDelList, *index, key, asc);
|
||||
}
|
||||
|
||||
if (asc) {
|
||||
if (*index >= num - 1) {
|
||||
TSDBKEY* last = taosArrayGetLast(pDelList);
|
||||
|
@ -3503,7 +3535,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
|
|||
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
|
||||
return pRow;
|
||||
} else {
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
|
||||
if (!dropped) {
|
||||
return pRow;
|
||||
}
|
||||
|
@ -3528,7 +3560,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
|
|||
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
|
||||
return pRow;
|
||||
} else {
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
|
||||
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
|
||||
if (!dropped) {
|
||||
return pRow;
|
||||
}
|
||||
|
|
|
@ -216,11 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
|||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
ASSERT(pColInfoData->info.type == pCol->type);
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
if ((pCol->flags & COL_IS_KEY)) {
|
||||
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
|
||||
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
goto _end;
|
||||
}
|
||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else {
|
||||
|
@ -248,11 +243,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
|||
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
|
||||
goto _end;
|
||||
}
|
||||
if ((pCol->flags & COL_IS_KEY)) {
|
||||
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
|
||||
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
||||
taosArrayPush(pVals, &cv);
|
||||
|
|
|
@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
|
|||
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
|
||||
}
|
||||
if (pSchema->flags & COL_IS_KEY) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z);
|
||||
}
|
||||
|
||||
pVal->flag = CV_FLAG_NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
|
|||
pBind = bind + c;
|
||||
}
|
||||
|
||||
if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){
|
||||
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
|
||||
if (code) {
|
||||
goto _return;
|
||||
|
@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
|
|||
pBind = bind;
|
||||
}
|
||||
|
||||
if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
|
||||
|
||||
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
|
||||
|
|
|
@ -8221,7 +8221,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) {
|
||||
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
|
||||
"Primary key column of dest table can not be null");
|
||||
"Primary key column name must be defined in existed-stable field");
|
||||
}
|
||||
|
||||
SNodeList* pNewProjections = NULL;
|
||||
|
|
Loading…
Reference in New Issue