Merge branch 'feat/TS-4243-3.0' into enh/TD-28945-pk

This commit is contained in:
Minglei Jin 2024-04-12 10:32:25 +08:00
commit 16b65b7311
23 changed files with 393 additions and 359 deletions

View File

@ -767,6 +767,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673)
#define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674) #define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674)
#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675) #define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675)
#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE TAOS_DEF_ERROR_CODE(0, 0x2676)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner //planner

View File

@ -1331,6 +1331,13 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
return NULL; return NULL;
} }
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock);
uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock);
taosMemoryFreeClear(pBlock->info.pks[0].pData);
taosMemoryFreeClear(pBlock->info.pks[1].pData);
}
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
taosMemoryFreeClear(pBlock); taosMemoryFreeClear(pBlock);
return NULL; return NULL;
@ -1478,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = createDataBlock();
pBlock->info = pDataBlock->info; pBlock->info = pDataBlock->info;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.capacity = 0; pBlock->info.capacity = 0;
pBlock->info.rowSize = 0; pBlock->info.rowSize = 0;
@ -1497,10 +1505,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pVal->type = pDataBlock->info.pks[0].type; pVal->type = pDataBlock->info.pks[0].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
pVal->nData = pDataBlock->info.pks[0].nData;
memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData);
pVal = &pBlock->info.pks[1]; SValue* p = &pBlock->info.pks[1];
pVal->type = pDataBlock->info.pks[1].type; p->type = pDataBlock->info.pks[1].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
p->nData = pDataBlock->info.pks[1].nData;
memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData);
uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData);
uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData);
} else {
uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock);
} }
if (copyData) { if (copyData) {

View File

@ -101,16 +101,18 @@ typedef struct {
int32_t kvRowSize; int32_t kvRowSize;
} SRowBuildScanInfo; } SRowBuildScanInfo;
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0); if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE;
sinfo->numOfNone++; sinfo->numOfNone++;
return 0;
} }
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0); if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
sinfo->numOfNull++; sinfo->numOfNull++;
sinfo->kvMaxOffset = sinfo->kvPayloadSize; sinfo->kvMaxOffset = sinfo->kvPayloadSize;
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
return 0;
} }
static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) { static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) {
@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal
} }
static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) { static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) {
int32_t code = 0;
int32_t colValIndex = 1; int32_t colValIndex = 1;
int32_t numOfColVals = TARRAY_SIZE(colVals); int32_t numOfColVals = TARRAY_SIZE(colVals);
SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals); SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals);
@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
for (int32_t i = 1; i < schema->numOfCols; i++) { for (int32_t i = 1; i < schema->numOfCols; i++) {
for (;;) { for (;;) {
if (colValIndex >= numOfColVals) { if (colValIndex >= numOfColVals) {
tRowBuildScanAddNone(sinfo, schema->columns + i); if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break; break;
} }
@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
tRowBuildScanAddNull(sinfo, schema->columns + i); if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit;
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
tRowBuildScanAddNone(sinfo, schema->columns + i); if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
} }
colValIndex++; colValIndex++;
break; break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) { } else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
tRowBuildScanAddNone(sinfo, schema->columns + i); if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break; break;
} else { // skip useless value } else { // skip useless value
colValIndex++; colValIndex++;
@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
+ sinfo->kvIndexSize // index array + sinfo->kvIndexSize // index array
+ sinfo->kvPayloadSize; // payload + sinfo->kvPayloadSize; // payload
return 0; _exit:
return code;
} }
static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema,

View File

@ -622,8 +622,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
} }
SRow* pRow = NULL; SRow* pRow = NULL;
tqInfo("result column flag:%d", pTSchema->columns[1].flags);
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);

View File

@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) { const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = TSKEY_MIN; uint64_t ts = TSKEY_MIN;
@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
} }
} }
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it
p->hasResult = true; p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
} }
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (idx < funcTypeBlockArray->size) { if (idx < funcTypeBlockArray->size) {
@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
if (IS_VAR_DATA_TYPE(pPkCol->type)) { if (IS_VAR_DATA_TYPE(pPkCol->type)) {
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes); p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
} }
p->pkColumn = *pPkCol;
} }
if (numOfTables == 0) { if (numOfTables == 0) {
@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end; goto _end;
} }
for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t pkBufLen = 0;
int32_t bytes; if (pr->rowKey.numOfPKs > 0) {
if (slotIds[j] == -1) { pkBufLen = pr->pkColumn.bytes;
bytes = 1; }
} else {
bytes = pr->pSchema->columns[slotIds[j]].bytes;
}
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN; p->ts = INT64_MIN;
} }

View File

@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
return pLoadInfo; return pLoadInfo;
} }
static void freeItem(void* pValue) {
SValue* p = (SValue*) pValue;
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData);
}
}
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
return NULL; return NULL;
@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo->info.pCount != NULL) { if (pLoadInfo->info.pCount != NULL) {
taosArrayDestroy(pLoadInfo->info.pUid); taosArrayDestroy(pLoadInfo->info.pUid);
taosArrayDestroy(pLoadInfo->info.pFirstKey); taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
taosArrayDestroy(pLoadInfo->info.pLastKey); taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
taosArrayDestroy(pLoadInfo->info.pCount); taosArrayDestroy(pLoadInfo->info.pCount);
taosArrayDestroy(pLoadInfo->info.pFirstTs); taosArrayDestroy(pLoadInfo->info.pFirstTs);
taosArrayDestroy(pLoadInfo->info.pLastTs); taosArrayDestroy(pLoadInfo->info.pLastTs);
@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tValueDupPayload(SValue *pVal) {
if (IS_VAR_DATA_TYPE(pVal->type)) {
char *p = (char *)pVal->pData;
char *pBuf = taosMemoryMalloc(pVal->nData);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pBuf, p, pVal->nData);
pVal->pData = (uint8_t *)pBuf;
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) { TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
@ -377,37 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
rows - i); rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
SValue vFirst = {0}, vLast = {0}; if (block.numOfPKs > 0) {
for (int32_t f = i; f < rows; ++f) { SValue vFirst = {0}, vLast = {0};
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst); for (int32_t f = i; f < rows; ++f) {
if (code) { int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
break; if (code) {
} break;
}
if (IS_VAR_DATA_TYPE(vFirst.type)) { tValueDupPayload(&vFirst);
char *p = (char *)vFirst.pData; taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
char *pBuf = taosMemoryMalloc(vFirst.nData);
memcpy(pBuf, p, vFirst.nData);
vFirst.pData = (uint8_t *)pBuf;
}
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast); // todo add api to clone the original data
if (code) { code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
break; if (code) {
} break;
}
if (IS_VAR_DATA_TYPE(vLast.type)) { tValueDupPayload(&vLast);
char *p = (char *)vLast.pData; taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
char *pBuf = taosMemoryMalloc(vLast.nData);
memcpy(pBuf, p, vLast.nData);
vLast.pData = (uint8_t *)pBuf;
} }
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
} }
} else { } else {
STbStatisRecord record; STbStatisRecord record = {0};
while (i < rows) { while (i < rows) {
tStatisBlockGet(&block, i, &record); tStatisBlockGet(&block, i, &record);
if (record.suid != suid) { if (record.suid != suid) {
@ -420,8 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts); taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts); taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]); if (record.firstKey.numOfPKs > 0) {
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]); SValue s = record.firstKey.pks[0];
tValueDupPayload(&s);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
s = record.lastKey.pks[0];
tValueDupPayload(&s);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
}
i += 1; i += 1;
} }
} }

View File

@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
int32_t rowIndex); int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange); SVersionRange* pVerRange, bool hasPk);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow); TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
@ -120,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
return ret > 0 ? 1 : -1; return ret > 0 ? 1 : -1;
} }
} else { } else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val); return p1->pks[0].val - p2->pks[0].val;
} }
} }
} }
@ -174,12 +173,16 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
for (int32_t i = 0; i < pRow->numOfPKs; i++) { for (int32_t i = 0; i < pRow->numOfPKs; i++) {
pKey->pks[i].type = indices[i].type; pKey->pks[i].type = indices[i].type;
uint8_t *tdata = data + indices[i].offset;
if (pRow->flag >> 4) {
tdata += tGetI16v(tdata, NULL);
}
if (IS_VAR_DATA_TYPE(indices[i].type)) { if (IS_VAR_DATA_TYPE(indices[i].type)) {
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); tdata += tGetU32v(tdata, &pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
pKey->pks[i].pData += pKey->pks[i].nData;
} else { } else {
pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
} }
} }
} }
@ -392,19 +395,21 @@ _err:
return code; return code;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) {
return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type);
}
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = 0; pIter->numOfBlocks = 0;
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else { } else {
taosArrayClear(pIter->blockList); clearDataBlockIterator(pIter, needFree);
} }
} }
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->pTableIter = NULL; pStatus->pTableIter = NULL;
pStatus->loadFromFile = true; pStatus->loadFromFile = true;
@ -483,7 +488,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) {
} }
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock,
SQueryTableDataCond* pCond) { SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) {
pResBlockInfo->capacity = capacity; pResBlockInfo->capacity = capacity;
pResBlockInfo->pResBlock = pResBlock; pResBlockInfo->pResBlock = pResBlock;
terrno = 0; terrno = 0;
@ -491,6 +496,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit
if (pResBlockInfo->pResBlock == NULL) { if (pResBlockInfo->pResBlock == NULL) {
pResBlockInfo->freeBlock = true; pResBlockInfo->freeBlock = true;
pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity);
if (pSup->numOfPks > 0) {
SSDataBlock* p = pResBlockInfo->pResBlock;
p->info.pks[0].type = pSup->pk.type;
p->info.pks[1].type = pSup->pk.type;
if (IS_VAR_DATA_TYPE(pSup->pk.type)) {
p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[0].pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[1].pData == NULL) {
taosMemoryFreeClear(p->info.pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[0].nData = pSup->pk.bytes;
p->info.pks[1].nData = pSup->pk.bytes;
}
}
} else { } else {
pResBlockInfo->freeBlock = false; pResBlockInfo->freeBlock = false;
} }
@ -523,14 +550,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->bFilesetDelimited = false;
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
if (pCond->numOfCols <= 0) { if (pCond->numOfCols <= 0) {
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
@ -546,6 +568,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
} }
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
code = tBlockDataCreate(&pReader->status.fileBlockData); code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
@ -567,8 +594,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
goto _end; goto _end;
} }
pReader->bFilesetDelimited = false;
tsdbInitReaderLock(pReader); tsdbInitReaderLock(pReader);
tsem_init(&pReader->resumeAfterSuspend, 0, 0); tsem_init(&pReader->resumeAfterSuspend, 0, 0);
@ -657,21 +682,19 @@ _end:
return code; return code;
} }
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
SArray* pTableScanInfoList) { SArray* pTableScanInfoList) {
size_t sizeInDisk = 0; int32_t k = 0;
int64_t st = taosGetTimestampUs(); size_t sizeInDisk = 0;
int64_t st = taosGetTimestampUs();
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
STimeWindow w = pReader->info.window;
SBrinRecord* pRecord = NULL;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SBrinRecordIter iter = {0};
// clear info for the new file // clear info for the new file
cleanupInfoForNextFileset(pReader->status.pTableMap); cleanupInfoForNextFileset(pReader->status.pTableMap);
int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
STimeWindow w = pReader->info.window;
SBrinRecord* pRecord = NULL;
SBrinRecordIter iter = {0};
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { while (((pRecord = getNextBrinRecord(&iter)) != NULL)) {
@ -743,14 +766,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
} }
if (pScanInfo->pBlockList == NULL) { if (pScanInfo->pBlockList == NULL) {
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
if (pScanInfo->pBlockList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); if (pScanInfo->pBlockIdxList == NULL) {
pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx));
if (pScanInfo->pBlockIdxList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)};
recordToBlockInfo(&blockInfo, pRecord);
void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo);
if (p1 == NULL) { if (p1 == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
// todo: refactor to record the fileset skey/ekey
if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) {
pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts;
} }
@ -788,18 +824,13 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo keep the the last returned key
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
pDumpInfo->allDumped = true; pDumpInfo->allDumped = true;
// ASSERT(0);
// pDumpInfo->lastKey.key.ts = maxKey + step;
} }
static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks,
bool asc) { bool asc) {
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = numOfPks; pKey->numOfPKs = numOfPks;
if (pKey->numOfPKs <= 0) { if (pKey->numOfPKs <= 0) {
return; return;
@ -809,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
} else { } else {
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData);
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
} }
} }
@ -1323,10 +1354,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
} }
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) { SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { int32_t step = asc ? 1 : -1;
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) {
return false; return false;
} }
@ -1334,9 +1367,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
return false; return false;
} }
int32_t step = asc ? 1 : -1; STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
blockInfoToRecord(pRecord, p); blockInfoToRecord(pRecord, p);
@ -1344,22 +1375,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
return true; return true;
} }
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
int32_t index = pBlockIter->index;
while (index < pBlockIter->numOfBlocks && index >= 0) {
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
return index;
}
index += step;
}
return -1;
}
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
int32_t step) { int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) { if (index < 0 || index >= pBlockIter->numOfBlocks) {
@ -1595,7 +1610,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc); tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc);
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
pSttBlockReader->numOfPks > 0)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true; return true;
} }
@ -2135,7 +2151,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order, bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order,
&pInfo->verRange); &pInfo->verRange, pReader->suppInfo.numOfPks > 0);
if (dropped) { if (dropped) {
return false; return false;
} }
@ -2705,7 +2721,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
} }
if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) {
code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
return code; return code;
@ -2820,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[0].val = pBlockInfo->firstPk.val;
pInfo->pks[1].val = pBlockInfo->lastPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val;
} else { } else {
memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData));
memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData));
pInfo->pks[0].nData = pBlockInfo->firstPKLen; pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData);
pInfo->pks[1].nData = pBlockInfo->lastPKLen; pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData);
} }
} }
@ -3153,23 +3169,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
if (pBlockInfo) { if (pBlockInfo) {
// todo handle
// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
// if (pScanInfo) {
// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey);
// lastKey = pScanInfo->lastProcKey;
// }
pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->totalRows = pBlockInfo->numRow;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1;
} else { } else {
pDumpInfo->totalRows = 0; pDumpInfo->totalRows = 0;
pDumpInfo->rowIndex = 0; pDumpInfo->rowIndex = 0;
// pDumpInfo->lastKey.key.ts = lastKey;
} }
pDumpInfo->allDumped = false; pDumpInfo->allDumped = false;
// pDumpInfo->lastKey = lastKey;
} }
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
@ -3194,7 +3201,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists } else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order); resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
} }
@ -3304,7 +3311,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order); resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromSttFiles(pReader); ERetrieveType type = doReadDataFromSttFiles(pReader);
@ -3381,8 +3388,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer}; return (SVersionRange){.minVer = startVer, .maxVer = endVer};
} }
static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) {
size_t num = taosArrayGetSize(pDelList);
int32_t start = index;
if (asc) {
if (start >= num - 1) {
start = num - 1;
}
TSDBKEY* p = taosArrayGet(pDelList, start);
while (p->ts >= key && start > 0) {
start -= 1;
}
} else {
if (index <= 0) {
start = 0;
}
TSDBKEY* p = taosArrayGet(pDelList, start);
while (p->ts <= key && start < num - 1) {
start += 1;
}
}
return start;
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange) { SVersionRange* pVerRange, bool hasPk) {
if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) { if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) {
return false; return false;
} }
@ -3391,6 +3425,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
if (hasPk) { // handle the case where duplicated timestamps existed.
*index = reverseSearchStartPos(pDelList, *index, key, asc);
}
if (asc) { if (asc) {
if (*index >= num - 1) { if (*index >= num - 1) {
TSDBKEY* last = taosArrayGetLast(pDelList); TSDBKEY* last = taosArrayGetLast(pDelList);
@ -3503,7 +3541,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow; return pRow;
} else { } else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
if (!dropped) { if (!dropped) {
return pRow; return pRow;
} }
@ -3528,7 +3566,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow; return pRow;
} else { } else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange); bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
if (!dropped) { if (!dropped) {
return pRow; return pRow;
} }
@ -4118,7 +4156,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
@ -4322,7 +4360,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFree(pSupInfo->colId); taosMemoryFree(pSupInfo->colId);
tBlockDataDestroy(&pReader->status.fileBlockData); tBlockDataDestroy(&pReader->status.fileBlockData);
cleanupDataBlockIterator(&pReader->status.blockIter); cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo));
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (pReader->status.pTableMap != NULL) { if (pReader->status.pTableMap != NULL) {
@ -4338,9 +4376,11 @@ void tsdbReaderClose2(STsdbReader* pReader) {
SReadCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pSttBlockReader != NULL) { if (pFilesetIter->pSttBlockReader != NULL) {
SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; SSttBlockReader* pSttBlockReader = pFilesetIter->pSttBlockReader;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pSttBlockReader->mergeTree);
taosMemoryFree(pLReader);
clearRowKey(&pSttBlockReader->currentKey);
taosMemoryFree(pSttBlockReader);
} }
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
@ -4996,7 +5036,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->info.order); resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo));
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);

View File

@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void clearRowKey(SRowKey* pKey) {
if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) {
return;
}
taosMemoryFreeClear(pKey->pks[0].pData);
}
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
int32_t numOfPks = pReader->suppInfo.numOfPks; int32_t numOfPks = pReader->suppInfo.numOfPks;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pFileDelData = taosArrayDestroy(p->pFileDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData);
clearRowKey(&p->lastProcKey);
clearRowKey(&p->sttRange.skey);
clearRowKey(&p->sttRange.ekey);
clearRowKey(&p->sttKeyInfo.nextProcKey);
} }
void destroyAllBlockScanInfo(SSHashObj* pTableMap) { void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
} }
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
pBlockInfo->uid = record->uid; pBlockInfo->uid = record->uid;
pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->firstKey = record->firstKey.key.ts;
pBlockInfo->lastKey = record->lastKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts;
@ -434,27 +446,55 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->firstPk.val = pFirstKey->pks[0].val;
pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val;
} else {
char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE);
memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData);
varDataSetLen(p, pFirstKey->pks[0].nData);
pBlockInfo->firstPk.pData = (uint8_t*)p;
pBlockInfo->firstPKLen = 0; int32_t keyLen = record->lastKey.key.pks[0].nData;
pBlockInfo->lastPKLen = 0; p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE);
} else { // todo handle memory alloc error, opt memory alloc perf memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen);
pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; varDataSetLen(p, keyLen);
pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); pBlockInfo->lastPk.pData = (uint8_t*)p;
memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen);
pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData;
pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen);
memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen);
} }
} }
} }
static void freePkItem(void* pItem) {
SFileDataBlockInfo* p = pItem;
taosMemoryFreeClear(p->firstPk.pData);
taosMemoryFreeClear(p->lastPk.pData);
}
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
pIter->index = -1;
pIter->numOfBlocks = 0;
if (needFree) {
taosArrayClearEx(pIter->blockList, freePkItem);
} else {
taosArrayClear(pIter->blockList);
}
}
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
pIter->index = -1;
pIter->numOfBlocks = 0;
if (needFree) {
taosArrayDestroyEx(pIter->blockList, freePkItem);
} else {
taosArrayDestroy(pIter->blockList);
}
}
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SBlockOrderSupporter sup = {0}; SBlockOrderSupporter sup = {0};
clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo));
pBlockIter->numOfBlocks = numOfBlocks; pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList);
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = taosArrayGetSize(pTableList); int32_t numOfTables = taosArrayGetSize(pTableList);
@ -482,9 +522,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
for (int32_t k = 0; k < num; ++k) { for (int32_t k = 0; k < num; ++k) {
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k);
sup.pDataBlockInfo[sup.numOfTables][k] = sup.pDataBlockInfo[sup.numOfTables][k] =
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo};
cnt++; cnt++;
} }
@ -499,20 +539,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
// since there is only one table qualified, blocks are not sorted // since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) { if (sup.numOfTables == 1) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
if (pTableScanInfo->pBlockIdxList == NULL) {
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
}
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.tbBlockIdx = i};
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
recordToBlockInfo(&blockInfo, record, pReader);
taosArrayPush(pBlockIter->blockList, &blockInfo);
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
} }
taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
@ -540,18 +572,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t pos = tMergeTreeGetChosenIndex(pTree);
int32_t index = sup.indexPerTable[pos]++; int32_t index = sup.indexPerTable[pos]++;
SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); taosArrayPush(pBlockIter->blockList, pBlockInfo);
recordToBlockInfo(&blockInfo, record, pReader);
taosArrayPush(pBlockIter->blockList, &blockInfo);
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
if (pTableScanInfo->pBlockIdxList == NULL) { STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
}
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
// set data block index overflow, in order to disable the offset comparator // set data block index overflow, in order to disable the offset comparator
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;

View File

@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo {
uint8_t* pData; uint8_t* pData;
} lastPk; } lastPk;
int32_t firstPKLen;
int32_t lastPKLen;
int64_t minVer; int64_t minVer;
int64_t maxVer; int64_t maxVer;
int64_t blockOffset; int64_t blockOffset;
@ -237,7 +235,6 @@ typedef struct SDataBlockIter {
typedef struct SFileBlockDumpInfo { typedef struct SFileBlockDumpInfo {
int32_t totalRows; int32_t totalRows;
int32_t rowIndex; int32_t rowIndex;
// int64_t lastKey;
// STsdbRowKey lastKey; // this key should be removed // STsdbRowKey lastKey; // this key should be removed
bool allDumped; bool allDumped;
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
@ -338,6 +335,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
int32_t numOfTables); int32_t numOfTables);
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
void destroyLDataIter(SLDataIter* pIter); void destroyLDataIter(SLDataIter* pIter);
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
@ -347,6 +345,12 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
void clearRowKey(SRowKey* pKey);
bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp);
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree);
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
typedef struct { typedef struct {
SArray* pTombData; SArray* pTombData;
@ -382,6 +386,7 @@ typedef struct SCacheRowsReader {
SArray* pFuncTypeList; SArray* pFuncTypeList;
__compar_fn_t pkComparFn; __compar_fn_t pkComparFn;
SRowKey rowKey; SRowKey rowKey;
SColumnInfo pkColumn;
} SCacheRowsReader; } SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo {
int32_t scanTimes; int32_t scanTimes;
int32_t readIdx; int32_t readIdx;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSHashObj* mTableNumRows; // uid->num of table rows SSHashObj* mTableNumRows; // uid->num of table rows
SHashObj* mSkipTables; SHashObj* mSkipTables;
int64_t mergeLimit; int64_t mergeLimit;
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
bool needCountEmptyTable; bool needCountEmptyTable;
bool bGroupProcessed; // the group return data means processed bool bGroupProcessed; // the group return data means processed
bool filesetDelimited; bool filesetDelimited;
bool bNewFilesetEvent; bool bNewFilesetEvent;
bool bNextDurationBlockEvent; bool bNextDurationBlockEvent;
int32_t numNextDurationBlocks; int32_t numNextDurationBlocks;
SSDataBlock* nextDurationBlocks[2]; SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks; bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx; int32_t nextDurationBlocksIdx;
bool bSortRowId;
bool bSortRowId;
STmsSubTablesMergeInfo* pSubTablesMergeInfo; STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo; } STableMergeScanInfo;

View File

@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo {
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType; int32_t retrieveType;
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferedRes;
SArray* pUidList; SArray* pUidList;
SArray* pCidList; SArray* pCidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
capacity = TMIN(totalTables, 4096); capacity = TMIN(totalTables, 4096);
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false);
setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode); setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
} else { // by tags } else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
capacity = 1; // only one row output capacity = 1; // only one row output
@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes); blockDataCleanup(pInfo->pBufferedRes);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList); pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// check for tag values // check for tag values
int32_t resultRows = pInfo->pBufferredRes->info.rows; int32_t resultRows = pInfo->pBufferedRes->info.rows;
// the results may be null, if last values are all null // the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
int32_t slotId = pCol->info.slotId; int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
void destroyCacheScanOperator(void* param) { void destroyCacheScanOperator(void* param) {
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes); blockDataDestroy(pInfo->pBufferedRes);
taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds); taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pCidList);

View File

@ -216,11 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type); ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) { if (colDataIsNull_s(pColInfoData, j)) {
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {
@ -248,11 +243,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
goto _end; goto _end;
} }
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);

View File

@ -255,11 +255,13 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
if (pItem->isPk) { if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type; pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type; pBlockInfo->pks[1].type = pInfoData->info.type;
// allocate enough buffer size, which is pInfoData->info.bytes
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
if (pBlockInfo->pks[0].pData == NULL) { if (pBlockInfo->pks[0].pData == NULL) {
@ -271,7 +273,12 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
taosMemoryFreeClear(pBlockInfo->pks[0].pData); taosMemoryFreeClear(pBlockInfo->pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pBlockInfo->pks[0].nData = pInfoData->info.bytes;
pBlockInfo->pks[1].nData = pInfoData->info.bytes;
} }
break;
} }
} }

View File

@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
pInfo->pResBlock->info.blankFill = false; pInfo->pResBlock->info.blankFill = false;
if (!pInfo->needCountEmptyTable) { if (!pInfo->needCountEmptyTable) {
@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
taosRLockLatch(&pTaskInfo->lock);
initNextGroupScan(pInfo, &pList, &num); initNextGroupScan(pInfo, &pList, &num);
taosRUnLockLatch(&pTaskInfo->lock);
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
@ -4069,14 +4070,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
} }
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pInfo->pReaderBlock;
SSDataBlock* pBlock = pInfo->pReaderBlock; int32_t code = 0;
int32_t code = 0; bool hasNext = false;
bool hasNext = false; STsdbReader* reader = pInfo->base.dataReader;
STsdbReader* reader = pInfo->base.dataReader;
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) { if (code != 0) {
@ -4112,27 +4112,24 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
*pSkipped = true; *pSkipped = true;
return; return;
} }
return; return;
} }
static SSDataBlock* getBlockForTableMergeScan(void* param) { static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = NULL; SOperatorInfo* pOperator = source->pOperator;
int32_t code = 0; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = NULL;
int64_t st = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
bool hasNext = false;
STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
if (pInfo->rtnNextDurationBlocks) { if (pInfo->rtnNextDurationBlocks) {
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
++pInfo->nextDurationBlocksIdx; ++pInfo->nextDurationBlocksIdx;
@ -4141,19 +4138,19 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
blockDataDestroy(pInfo->nextDurationBlocks[i]); blockDataDestroy(pInfo->nextDurationBlocks[i]);
pInfo->nextDurationBlocks[i] = NULL; pInfo->nextDurationBlocks[i] = NULL;
} }
pInfo->rtnNextDurationBlocks = false; pInfo->rtnNextDurationBlocks = false;
pInfo->nextDurationBlocksIdx = 0; pInfo->nextDurationBlocksIdx = 0;
pInfo->numNextDurationBlocks = 0; pInfo->numNextDurationBlocks = 0;
continue; continue;
} }
} else { } else {
bool bFinished = false; bool bFinished = false;
bool bSkipped = false; bool bSkipped = false;
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
pBlock = pInfo->pReaderBlock; pBlock = pInfo->pReaderBlock;
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
if (bFinished) { if (bFinished) {
pInfo->bNewFilesetEvent = false; pInfo->bNewFilesetEvent = false;
break; break;
@ -4164,15 +4161,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
++pInfo->numNextDurationBlocks; ++pInfo->numNextDurationBlocks;
if (pInfo->numNextDurationBlocks > 2) { if (pInfo->numNextDurationBlocks > 2) {
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo),
pInfo->numNextDurationBlocks);
pInfo->bNewFilesetEvent = false; pInfo->bNewFilesetEvent = false;
break; break;
} }
} }
if (pInfo->bNewFilesetEvent) { if (pInfo->bNewFilesetEvent) {
pInfo->rtnNextDurationBlocks = true; pInfo->rtnNextDurationBlocks = true;
return NULL; return NULL;
} }
if (pInfo->bNextDurationBlockEvent) { if (pInfo->bNextDurationBlockEvent) {
pInfo->bNextDurationBlockEvent = false; pInfo->bNextDurationBlockEvent = false;
continue; continue;
@ -4180,10 +4180,10 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
} }
if (bSkipped) continue; if (bSkipped) continue;
} }
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock; return pBlock;
@ -4192,7 +4192,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
return NULL; return NULL;
} }
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo biTs = {0}; SBlockOrderInfo biTs = {0};

View File

@ -1339,23 +1339,26 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// The gap is less than the threshold, so it belongs to current session window that has been opened already. // The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; // start a new session window
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
SResultRow* pResult = NULL;
// keep the time window for the closed time window. // keep the time window for the closed time window.
STimeWindow window = pRowSup->win; STimeWindow window = pRowSup->win;
pRowSup->win.ekey = pRowSup->win.skey; int32_t ret =
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid); doKeepTuple(pRowSup, tsList[j], gid);

View File

@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
} }
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (pBlk != NULL) { if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData; int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
if (bExtractedBlock) { if (bExtractedBlock) {
blockDataDestroy(pBlk); blockDataDestroy(pBlk);
} }
continue; continue;
} }
} }

View File

@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first // please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function.
// function.
#if 0 #if 0
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
} }
#else #else
// todo refactor
if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) {
numOfElems = 1; numOfElems = 1;
@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
} }
// SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z); return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
} }
if (pSchema->flags & COL_IS_KEY) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z);
}
pVal->flag = CV_FLAG_NULL; pVal->flag = CV_FLAG_NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
pBind = bind + c; pBind = bind + c;
} }
if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (code) { if (code) {
goto _return; goto _return;
@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
pBind = bind; pBind = bind;
} }
if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) {
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1); tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum); qDebug("stmt col %d bind %d rows data", colIdx, rowNum);

View File

@ -8271,7 +8271,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC
if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) { if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"Primary key column of dest table can not be null"); "Primary key column name must be defined in existed-stable field");
} }
SNodeList* pNewProjections = NULL; SNodeList* pNewProjections = NULL;

View File

@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
if (commitIndex >= ths->assignedCommitIndex) { if (commitIndex >= ths->assignedCommitIndex) {
terrno = TSDB_CODE_SUCCESS; syncNodeStepDown(ths, pMsg->term);
raftStoreNextTerm(ths);
if (terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
return -1;
}
if (syncNodeAssignedLeader2Leader(ths) != 0) {
sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId);
return -1;
}
taosThreadMutexLock(&ths->arbTokenMutex);
syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken);
sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken);
taosThreadMutexUnlock(&ths->arbTokenMutex);
} }
} else { } else {
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);

View File

@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) {
return code; return code;
} }
#ifdef BUILD_NO_CALL
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync step down error");
return -1;
}
syncNodeStepDown(pSyncNode, newTerm);
syncNodeRelease(pSyncNode);
return 0;
}
#endif
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
// syncNodeBecomeFollower(pSyncNode);
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow; pSyncNode->startTime = timeNow;
@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
#if 0
// change isStandBy to normal (election timeout)
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, "");
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
// syncMaybeAdvanceCommitIndex(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode, "");
}
#endif
} else { } else {
// persist cfg // persist cfg
syncWriteCfgFile(pSyncNode); syncWriteCfgFile(pSyncNode);
@ -1874,18 +1845,6 @@ _END:
} }
// raft state change -------------- // raft state change --------------
#ifdef BUILD_NO_CALL
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode);
}
}
#endif
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) { if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term); raftStoreSetTerm(pSyncNode, term);
@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
} while (0); } while (0);
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
}
if (currentTerm < newTerm) { if (currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode, newTerm); raftStoreSetTerm(pSyncNode, newTerm);
char tmpBuf[64]; char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf); syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode); raftStoreClearVote(pSyncNode);
} else { } else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(pSyncNode, "step down"); syncNodeBecomeFollower(pSyncNode, "step down");
@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "follower to candidate"); sNTrace(pSyncNode, "follower to candidate");
} }
#ifdef BUILD_NO_CALL
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "leader to follower");
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
sNTrace(pSyncNode, "candidate to follower");
}
#endif
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");

View File

@ -629,6 +629,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column should not be none")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner //planner