Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0
This commit is contained in:
commit
135bdb37a0
|
@ -197,7 +197,7 @@ typedef struct STableDataCxt {
|
||||||
SBoundColInfo boundColsInfo;
|
SBoundColInfo boundColsInfo;
|
||||||
SArray* pValues;
|
SArray* pValues;
|
||||||
SSubmitTbData* pData;
|
SSubmitTbData* pData;
|
||||||
TSKEY lastTs;
|
SRowKey lastKey;
|
||||||
bool ordered;
|
bool ordered;
|
||||||
bool duplicateTs;
|
bool duplicateTs;
|
||||||
} STableDataCxt;
|
} STableDataCxt;
|
||||||
|
|
|
@ -557,13 +557,10 @@ void tRowDestroy(SRow *pRow) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tRowPCmprFn(const void *p1, const void *p2) {
|
static int32_t tRowPCmprFn(const void *p1, const void *p2) {
|
||||||
if ((*(SRow **)p1)->ts < (*(SRow **)p2)->ts) {
|
SRowKey key1, key2;
|
||||||
return -1;
|
tRowGetKey(*(SRow **)p1, &key1);
|
||||||
} else if ((*(SRow **)p1)->ts > (*(SRow **)p2)->ts) {
|
tRowGetKey(*(SRow **)p2, &key2);
|
||||||
return 1;
|
return tRowKeyCompare(&key1, &key2);
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); }
|
static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); }
|
||||||
static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) {
|
static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) {
|
||||||
|
@ -645,13 +642,18 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
|
||||||
|
|
||||||
int32_t iStart = 0;
|
int32_t iStart = 0;
|
||||||
while (iStart < aRowP->size) {
|
while (iStart < aRowP->size) {
|
||||||
SRow *pRow = (SRow *)taosArrayGetP(aRowP, iStart);
|
SRowKey key1;
|
||||||
|
SRow *row1 = (SRow *)taosArrayGetP(aRowP, iStart);
|
||||||
|
|
||||||
|
tRowGetKey(row1, &key1);
|
||||||
|
|
||||||
int32_t iEnd = iStart + 1;
|
int32_t iEnd = iStart + 1;
|
||||||
while (iEnd < aRowP->size) {
|
while (iEnd < aRowP->size) {
|
||||||
SRow *pRowT = (SRow *)taosArrayGetP(aRowP, iEnd);
|
SRowKey key2;
|
||||||
|
SRow *row2 = (SRow *)taosArrayGetP(aRowP, iEnd);
|
||||||
|
tRowGetKey(row2, &key2);
|
||||||
|
|
||||||
if (pRow->ts != pRowT->ts) break;
|
if (tRowKeyCompare(&key1, &key2) != 0) break;
|
||||||
|
|
||||||
iEnd++;
|
iEnd++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -836,6 +836,8 @@ struct SLDataIter {
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
|
SRowKey startRowKey; // current row key
|
||||||
|
__compar_fn_t comparFn;
|
||||||
bool ignoreEarlierTs;
|
bool ignoreEarlierTs;
|
||||||
struct SSttFileReader *pReader;
|
struct SSttFileReader *pReader;
|
||||||
};
|
};
|
||||||
|
@ -846,7 +848,7 @@ struct SSttFileReader;
|
||||||
typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader,
|
typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader,
|
||||||
SSttBlockLoadInfo *pLoadInfo);
|
SSttBlockLoadInfo *pLoadInfo);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SMergeTreeConf {
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
|
@ -859,7 +861,9 @@ typedef struct {
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
int16_t *pCols;
|
int16_t *pCols;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
SRowKey *pCurRowKey;
|
||||||
_load_tomb_fn loadTombFn;
|
_load_tomb_fn loadTombFn;
|
||||||
|
__compar_fn_t comparFn;
|
||||||
void *pReader;
|
void *pReader;
|
||||||
void *idstr;
|
void *idstr;
|
||||||
bool rspRows; // response the rows in stt-file, if possible
|
bool rspRows; // response the rows in stt-file, if possible
|
||||||
|
|
|
@ -479,6 +479,9 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
||||||
pIter->verRange.maxVer = pConf->verRange.maxVer;
|
pIter->verRange.maxVer = pConf->verRange.maxVer;
|
||||||
pIter->timeWindow.skey = pConf->timewindow.skey;
|
pIter->timeWindow.skey = pConf->timewindow.skey;
|
||||||
pIter->timeWindow.ekey = pConf->timewindow.ekey;
|
pIter->timeWindow.ekey = pConf->timewindow.ekey;
|
||||||
|
pIter->comparFn = pConf->comparFn;
|
||||||
|
|
||||||
|
tRowKeyAssign(&pIter->startRowKey, pConf->pCurRowKey);
|
||||||
pIter->pReader = pSttFileReader;
|
pIter->pReader = pSttFileReader;
|
||||||
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
||||||
|
|
||||||
|
@ -621,15 +624,37 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
if (!pIter->backward) { // asc
|
if (!pIter->backward) { // asc
|
||||||
if (ts > pIter->timeWindow.ekey) { // no more data
|
if (ts > pIter->timeWindow.ekey) { // no more data
|
||||||
break;
|
break;
|
||||||
} else if (ts < pIter->timeWindow.skey) {
|
} else {
|
||||||
|
if (ts < pIter->timeWindow.skey) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ts == pIter->timeWindow.skey && pIter->startRowKey.numOfPKs > 0) {
|
||||||
|
SRowKey key;
|
||||||
|
tColRowGetKey(pData, i, &key);
|
||||||
|
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
|
||||||
|
if (ret < 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ts < pIter->timeWindow.skey) {
|
if (ts < pIter->timeWindow.skey) {
|
||||||
break;
|
break;
|
||||||
} else if (ts > pIter->timeWindow.ekey) {
|
} else {
|
||||||
|
if (ts > pIter->timeWindow.ekey) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ts == pIter->timeWindow.ekey && pIter->startRowKey.numOfPKs > 0) {
|
||||||
|
SRowKey key;
|
||||||
|
tColRowGetKey(pData, i, &key);
|
||||||
|
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
|
||||||
|
if (ret > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ver = pData->aVersion[i];
|
int64_t ver = pData->aVersion[i];
|
||||||
|
@ -802,8 +827,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
||||||
|
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
int64_t numOfRows = 0;
|
int64_t numOfRows = 0;
|
||||||
|
|
||||||
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
|
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
|
||||||
|
|
||||||
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows,
|
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows,
|
||||||
pMTree->idStr);
|
pMTree->idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -89,7 +89,7 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
|
||||||
|
|
||||||
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
|
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
|
||||||
|
|
||||||
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
||||||
if (p2 == NULL) {
|
if (p2 == NULL) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -118,12 +118,13 @@ static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slot
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pKey->numOfPKs = 1;
|
|
||||||
|
|
||||||
SColData* pColData = &pBlock->aColData[slotId];
|
SColData* pColData = &pBlock->aColData[slotId];
|
||||||
SColVal cv;
|
SColVal cv;
|
||||||
tColDataGetValue(pColData, irow, &cv);
|
tColDataGetValue(pColData, irow, &cv);
|
||||||
|
|
||||||
|
pKey->numOfPKs = 1;
|
||||||
|
pKey->pks[0].type = cv.value.type;
|
||||||
|
|
||||||
if (IS_NUMERIC_TYPE(cv.value.type)) {
|
if (IS_NUMERIC_TYPE(cv.value.type)) {
|
||||||
pKey->pks[0].val = cv.value.val;
|
pKey->pks[0].val = cv.value.val;
|
||||||
} else {
|
} else {
|
||||||
|
@ -649,7 +650,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
int32_t k = 0;
|
int32_t k = 0;
|
||||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
int32_t step = asc ? 1 : -1;
|
|
||||||
STimeWindow w = pReader->info.window;
|
STimeWindow w = pReader->info.window;
|
||||||
SBrinRecord* pRecord = NULL;
|
SBrinRecord* pRecord = NULL;
|
||||||
|
|
||||||
|
@ -709,9 +709,15 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (asc) {
|
||||||
if (pkCompEx(pReader->pkComparFn, &pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) {
|
if (pkCompEx(pReader->pkComparFn, &pRecord->lastKey.key, &pScanInfo->lastProcKey) <= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (pkCompEx(pReader->pkComparFn, &pRecord->firstKey.key, &pScanInfo->lastProcKey) >= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 2. version range check, version range is an CLOSED interval
|
// 2. version range check, version range is an CLOSED interval
|
||||||
if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) {
|
if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) {
|
||||||
|
@ -1394,7 +1400,7 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanI
|
||||||
|
|
||||||
int64_t key = 0;
|
int64_t key = 0;
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
key = ascScan ? TMIN(pBlock->firstKey, keyInStt) : TMAX(pBlock->lastKey, keyInStt);
|
key = ascScan ? TMIN(pBlock->firstKey, keyInStt) : TMAX(pBlock->lastKey, keyInStt);
|
||||||
} else {
|
} else {
|
||||||
key = ascScan ? pBlock->firstKey : pBlock->lastKey;
|
key = ascScan ? pBlock->firstKey : pBlock->lastKey;
|
||||||
|
@ -1437,9 +1443,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
||||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
|
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
|
||||||
|
|
||||||
|
// todo handle the primary key overlap case
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||||
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
|
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1538,6 +1545,7 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
SVersionRange* pVerRange) {
|
SVersionRange* pVerRange) {
|
||||||
int32_t order = pSttBlockReader->order;
|
int32_t order = pSttBlockReader->order;
|
||||||
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||||
|
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
|
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
|
||||||
|
@ -1545,7 +1553,14 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
|
||||||
// next file, the timestamps in the next file must be greater than those in current
|
// next file, the timestamps in the next file must be greater than those in current
|
||||||
pScanInfo->sttKeyInfo.nextProcKey += step;
|
pNextProc->ts += step;
|
||||||
|
if (pSttBlockReader->numOfPks > 0) {
|
||||||
|
if (IS_NUMERIC_TYPE(pNextProc->pks[0].type)) {
|
||||||
|
pNextProc->pks[0].val = INT64_MIN;
|
||||||
|
} else {
|
||||||
|
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1559,7 +1574,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
||||||
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey);
|
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc);
|
||||||
|
|
||||||
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||||
|
@ -1850,9 +1866,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
|
||||||
SRowKey* pSttKey = NULL;
|
SRowKey* pSttKey = &(SRowKey){0};
|
||||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||||
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
|
||||||
|
} else {
|
||||||
|
pSttKey = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey* pfKey = &(SRowKey){0};
|
SRowKey* pfKey = &(SRowKey){0};
|
||||||
|
@ -1891,7 +1909,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey minKey;
|
SRowKey minKey = {0};
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
minKey = k; // let's find the minimum
|
minKey = k; // let's find the minimum
|
||||||
|
|
||||||
|
@ -1899,11 +1917,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
if ((pfKey != NULL) && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
if ((pSttKey != NULL) && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1912,11 +1930,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfKey != NULL && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
if ((pfKey != NULL) && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
if ((pSttKey != NULL) && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2042,7 +2060,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
STbData* d = NULL;
|
STbData* d = NULL;
|
||||||
STbData* di = NULL;
|
STbData* di = NULL;
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
bool forward = true;
|
||||||
STsdbReadSnap* pSnap = pReader->pReadSnap;
|
STsdbReadSnap* pSnap = pReader->pReadSnap;
|
||||||
|
STimeWindow* pWindow = &pReader->info.window;
|
||||||
|
|
||||||
if (pBlockScanInfo->iterInit) {
|
if (pBlockScanInfo->iterInit) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2051,6 +2071,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
STsdbRowKey startKey = {0};
|
STsdbRowKey startKey = {0};
|
||||||
tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey);
|
tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey);
|
||||||
startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer;
|
startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer;
|
||||||
|
if ((asc && (startKey.key.ts < pWindow->skey)) || ((!asc) && startKey.key.ts > pWindow->ekey)) {
|
||||||
|
startKey.key.ts = asc? pWindow->skey:pWindow->ekey;
|
||||||
|
forward = false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem");
|
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem");
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2063,7 +2087,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
||||||
}
|
}
|
||||||
|
|
||||||
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
||||||
|
|
||||||
|
if (forward) {
|
||||||
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
|
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
|
||||||
|
}
|
||||||
|
|
||||||
pBlockScanInfo->iterInit = true;
|
pBlockScanInfo->iterInit = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2115,6 +2142,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
||||||
|
|
||||||
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
bool hasData = true;
|
bool hasData = true;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
// the stt block reader has been initialized for this table.
|
// the stt block reader has been initialized for this table.
|
||||||
if (pSttBlockReader->uid == pScanInfo->uid) {
|
if (pSttBlockReader->uid == pScanInfo->uid) {
|
||||||
|
@ -2133,10 +2161,10 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow w = pSttBlockReader->window;
|
STimeWindow w = pSttBlockReader->window;
|
||||||
if (ASCENDING_TRAVERSE(pSttBlockReader->order)) {
|
if (asc) {
|
||||||
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
|
w.skey = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
} else {
|
} else {
|
||||||
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
|
w.ekey = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -2157,6 +2185,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
.pCols = pReader->suppInfo.colId,
|
.pCols = pReader->suppInfo.colId,
|
||||||
.numOfCols = pReader->suppInfo.numOfCols,
|
.numOfCols = pReader->suppInfo.numOfCols,
|
||||||
.loadTombFn = loadSttTombDataForAll,
|
.loadTombFn = loadSttTombDataForAll,
|
||||||
|
.pCurRowKey = &pScanInfo->sttKeyInfo.nextProcKey,
|
||||||
|
.comparFn = pReader->pkComparFn,
|
||||||
.pReader = pReader,
|
.pReader = pReader,
|
||||||
.idstr = pReader->idStr,
|
.idstr = pReader->idStr,
|
||||||
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
|
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
|
||||||
|
@ -2193,8 +2223,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
}
|
}
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey =
|
|
||||||
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
// todo set the primary key value
|
||||||
|
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
|
||||||
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||||
} else { // not clean stt blocks
|
} else { // not clean stt blocks
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
|
||||||
|
@ -2755,7 +2786,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
|
||||||
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||||
|
@ -2915,7 +2946,7 @@ static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanI
|
||||||
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
|
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
return (asc && pBlockInfo->lastKey < keyInStt) || (!asc && pBlockInfo->firstKey > keyInStt);
|
return (asc && pBlockInfo->lastKey < keyInStt) || (!asc && pBlockInfo->firstKey > keyInStt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2963,7 +2994,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||||
} else {
|
} else {
|
||||||
if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) {
|
if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) {
|
||||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
|
||||||
|
|
||||||
if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) ||
|
if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) ||
|
||||||
(!asc && pBlockInfo->firstKey > keyInStt)) {
|
(!asc && pBlockInfo->firstKey > keyInStt)) {
|
||||||
|
@ -3649,7 +3680,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
|
||||||
} else {
|
} else {
|
||||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
|
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
|
||||||
pScanInfo->sttKeyInfo.nextProcKey, idStr);
|
pScanInfo->sttKeyInfo.nextProcKey.ts, idStr);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,25 +130,46 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c
|
||||||
return *p;
|
return *p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len) {
|
||||||
|
pKey->numOfPKs = numOfPks;
|
||||||
|
pKey->ts = ts;
|
||||||
|
|
||||||
|
if (numOfPks > 0) {
|
||||||
|
pKey->pks[0].type = type;
|
||||||
|
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
|
||||||
|
pKey->pks[0].val = INT64_MIN;
|
||||||
|
} else {
|
||||||
|
pKey->pks[0].pData = taosMemoryCalloc(1, len);
|
||||||
|
pKey->pks[0].nData = 0;
|
||||||
|
|
||||||
|
if (pKey->pks[0].pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
|
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
|
||||||
|
int32_t numOfPks = pReader->suppInfo.numOfPks;
|
||||||
|
|
||||||
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
int64_t skey = pReader->info.window.skey;
|
int64_t skey = pReader->info.window.skey;
|
||||||
pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = skey;
|
|
||||||
|
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
|
||||||
|
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type,
|
||||||
|
pReader->suppInfo.pk.bytes);
|
||||||
} else {
|
} else {
|
||||||
int64_t ekey = pReader->info.window.ekey;
|
int64_t ekey = pReader->info.window.ekey;
|
||||||
pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = ekey;
|
|
||||||
}
|
|
||||||
|
|
||||||
// only handle the first primary key.
|
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
|
||||||
pRowKey->numOfPKs = pReader->suppInfo.numOfPks;
|
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type,
|
||||||
if (pReader->suppInfo.numOfPks > 0) {
|
pReader->suppInfo.pk.bytes);
|
||||||
if (IS_VAR_DATA_TYPE(pReader->suppInfo.pk.type)) {
|
|
||||||
pRowKey->pks[0].pData = taosMemoryCalloc(1, pReader->suppInfo.pk.bytes);
|
|
||||||
}
|
|
||||||
pRowKey->pks[0].type = pReader->suppInfo.pk.type;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +251,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
|
||||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
pInfo->lastProcKey.ts = ts;
|
pInfo->lastProcKey.ts = ts;
|
||||||
// todo check the nextProcKey info
|
// todo check the nextProcKey info
|
||||||
pInfo->sttKeyInfo.nextProcKey = ts + step;
|
pInfo->sttKeyInfo.nextProcKey.ts = ts + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,9 @@ typedef enum ESttKeyStatus {
|
||||||
|
|
||||||
typedef struct SSttKeyInfo {
|
typedef struct SSttKeyInfo {
|
||||||
ESttKeyStatus status; // this value should be updated when switch to the next fileset
|
ESttKeyStatus status; // this value should be updated when switch to the next fileset
|
||||||
int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey value
|
SRowKey nextProcKey;
|
||||||
|
// int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey
|
||||||
|
// value
|
||||||
} SSttKeyInfo;
|
} SSttKeyInfo;
|
||||||
|
|
||||||
// clean stt file blocks:
|
// clean stt file blocks:
|
||||||
|
@ -333,6 +335,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
||||||
const char* pstr);
|
const char* pstr);
|
||||||
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
|
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
|
||||||
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
||||||
|
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray* pTombData;
|
SArray* pTombData;
|
||||||
|
|
|
@ -1586,12 +1586,26 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
} else {
|
} else {
|
||||||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||||
|
SRowKey lastRowKey;
|
||||||
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
if (iRow == 0) {
|
||||||
|
tRowGetKey(aRow[iRow], &lastRowKey);
|
||||||
|
} else {
|
||||||
|
SRowKey rowKey;
|
||||||
|
tRowGetKey(aRow[iRow], &rowKey);
|
||||||
|
|
||||||
|
if (tRowKeyCompare(&lastRowKey, &rowKey) >= 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
lastRowKey = rowKey;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1735,10 +1749,14 @@ _exit:
|
||||||
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
|
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||||
|
|
||||||
if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
|
if (tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0) {
|
||||||
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId,
|
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS,
|
||||||
pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
|
pVnode->monitor.strClusterId,
|
||||||
pOriginalMsg->info.conn.user, "Success"};
|
pVnode->monitor.strDnodeId,
|
||||||
|
tsLocalEp,
|
||||||
|
pVnode->monitor.strVgId,
|
||||||
|
pOriginalMsg->info.conn.user,
|
||||||
|
"Success"};
|
||||||
taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels);
|
taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -144,7 +144,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
|
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t getIrateInfoSize();
|
int32_t getIrateInfoSize(int32_t pkBytes);
|
||||||
|
|
||||||
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);
|
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
|
|
|
@ -1638,7 +1638,8 @@ static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
if (!IS_NUMERIC_TYPE(colType)) {
|
if (!IS_NUMERIC_TYPE(colType)) {
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize(pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
} else {
|
} else {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
|
|
@ -269,6 +269,12 @@ typedef struct SRateInfo {
|
||||||
double lastValue;
|
double lastValue;
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
int8_t hasResult; // flag to denote has value
|
int8_t hasResult; // flag to denote has value
|
||||||
|
|
||||||
|
char* firstPk;
|
||||||
|
char* lastPk;
|
||||||
|
int8_t pkType;
|
||||||
|
int32_t pkBytes;
|
||||||
|
char pkData[];
|
||||||
} SRateInfo;
|
} SRateInfo;
|
||||||
|
|
||||||
typedef struct SGroupKeyInfo {
|
typedef struct SGroupKeyInfo {
|
||||||
|
@ -2337,6 +2343,11 @@ EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
|
||||||
|
|
||||||
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
||||||
if (pResult->hasResult) {
|
if (pResult->hasResult) {
|
||||||
|
if (pResult->pkBytes > 0) {
|
||||||
|
pResult->pkData = pResult->buf + pResult->bytes;
|
||||||
|
} else {
|
||||||
|
pResult->pkData = NULL;
|
||||||
|
}
|
||||||
if (pResult->ts < pBlockInfo->window.skey) {
|
if (pResult->ts < pBlockInfo->window.skey) {
|
||||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
} else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) {
|
} else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) {
|
||||||
|
@ -2360,6 +2371,11 @@ EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
|
||||||
|
|
||||||
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
||||||
if (pResult->hasResult) {
|
if (pResult->hasResult) {
|
||||||
|
if (pResult->pkBytes > 0) {
|
||||||
|
pResult->pkData = pResult->buf + pResult->bytes;
|
||||||
|
} else {
|
||||||
|
pResult->pkData = NULL;
|
||||||
|
}
|
||||||
if (pResult->ts > pBlockInfo->window.ekey) {
|
if (pResult->ts > pBlockInfo->window.ekey) {
|
||||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
} else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) {
|
} else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) {
|
||||||
|
@ -6165,10 +6181,11 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); }
|
int32_t getIrateInfoSize(int32_t pkBytes) { return (int32_t)sizeof(SRateInfo) + 2 * pkBytes; }
|
||||||
|
|
||||||
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SRateInfo);
|
int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
|
||||||
|
pEnv->calcMemSize = getIrateInfoSize(pkBytes);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6188,6 +6205,36 @@ bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char* pk, double v) {
|
||||||
|
if (isFirst) {
|
||||||
|
pRateInfo->firstValue = v;
|
||||||
|
pRateInfo->firstKey = ts;
|
||||||
|
if (pRateInfo->firstPk) {
|
||||||
|
int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes;
|
||||||
|
memcpy(pRateInfo->firstPk, pk, pkBytes);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pRateInfo->lastValue = v;
|
||||||
|
pRateInfo->lastKey = ts;
|
||||||
|
if (pRateInfo->lastPk) {
|
||||||
|
int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes;
|
||||||
|
memcpy(pRateInfo->lastPk, pk, pkBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo) {
|
||||||
|
if (pCtx->hasPrimaryKey) {
|
||||||
|
pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type;
|
||||||
|
pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes;
|
||||||
|
pRateInfo->firstPk = pRateInfo->pkData;
|
||||||
|
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
|
||||||
|
} else {
|
||||||
|
pRateInfo->firstPk = NULL;
|
||||||
|
pRateInfo->lastPk = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
@ -6199,6 +6246,8 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
funcInputUpdate(pCtx);
|
funcInputUpdate(pCtx);
|
||||||
|
|
||||||
|
initializeRateInfo(pCtx, pRateInfo);
|
||||||
|
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
int32_t type = pInputCol->info.type;
|
int32_t type = pInputCol->info.type;
|
||||||
SFuncInputRow row = {0};
|
SFuncInputRow row = {0};
|
||||||
|
@ -6212,21 +6261,16 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
GET_TYPED_DATA(v, double, type, data);
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
|
|
||||||
if (INT64_MIN == pRateInfo->lastKey) {
|
if (INT64_MIN == pRateInfo->lastKey) {
|
||||||
pRateInfo->lastValue = v;
|
doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v);
|
||||||
pRateInfo->lastKey = row.ts;
|
|
||||||
pRateInfo->hasResult = 1;
|
pRateInfo->hasResult = 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (row.ts > pRateInfo->lastKey) {
|
if (row.ts > pRateInfo->lastKey) {
|
||||||
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
|
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
|
||||||
pRateInfo->firstValue = pRateInfo->lastValue;
|
doSaveRateInfo(pRateInfo, true, pRateInfo->lastKey, pRateInfo->lastPk, pRateInfo->lastValue);
|
||||||
pRateInfo->firstKey = pRateInfo->lastKey;
|
|
||||||
}
|
}
|
||||||
|
doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v);
|
||||||
pRateInfo->lastValue = v;
|
|
||||||
pRateInfo->lastKey = row.ts;
|
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
} else if (row.ts == pRateInfo->lastKey) {
|
} else if (row.ts == pRateInfo->lastKey) {
|
||||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
|
@ -6234,8 +6278,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
|
|
||||||
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
|
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
|
||||||
pRateInfo->firstValue = v;
|
doSaveRateInfo(pRateInfo, true, row.ts, row.pPk, v);
|
||||||
pRateInfo->firstKey = row.ts;
|
|
||||||
} else if (row.ts == pRateInfo->firstKey) {
|
} else if (row.ts == pRateInfo->firstKey) {
|
||||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
@ -6271,25 +6314,26 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
|
||||||
|
|
||||||
static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) {
|
static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) {
|
||||||
if (inputKey > pOutput->lastKey) {
|
if (inputKey > pOutput->lastKey) {
|
||||||
pOutput->firstKey = pOutput->lastKey;
|
doSaveRateInfo(pOutput, true, pOutput->lastKey, pOutput->lastPk, pOutput->lastValue);
|
||||||
pOutput->firstValue = pOutput->lastValue;
|
if (isFirstKey) {
|
||||||
|
doSaveRateInfo(pOutput, false, pInput->firstKey, pInput->firstPk, pInput->firstValue);
|
||||||
pOutput->lastKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
|
} else {
|
||||||
pOutput->lastValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
|
doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue);
|
||||||
|
}
|
||||||
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
|
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
|
||||||
pOutput->firstKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
|
if (isFirstKey) {
|
||||||
pOutput->firstValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
|
doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue);
|
||||||
|
} else {
|
||||||
|
doSaveRateInfo(pOutput, true, pInput->lastKey, pInput->lastPk, pInput->lastValue);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// inputKey < pOutput->firstKey
|
// inputKey < pOutput->firstKey
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
||||||
pOutput->firstKey = pInput->firstKey;
|
doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue);
|
||||||
pOutput->lastKey = pInput->lastKey;
|
doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue);
|
||||||
|
|
||||||
pOutput->firstValue = pInput->firstValue;
|
|
||||||
pOutput->lastValue = pInput->lastValue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
|
||||||
|
@ -6324,11 +6368,13 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
initializeRateInfo(pCtx, pInfo);
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
|
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
|
||||||
|
initializeRateInfo(pCtx, pInfo);
|
||||||
if (pInputInfo->hasResult) {
|
if (pInputInfo->hasResult) {
|
||||||
int32_t code = irateTransferInfo(pInputInfo, pInfo);
|
int32_t code = irateTransferInfo(pInputInfo, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -6347,7 +6393,7 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
int32_t resultBytes = getIrateInfoSize();
|
int32_t resultBytes = getIrateInfoSize(pInfo->pkBytes);
|
||||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
memcpy(varDataVal(res), pInfo, resultBytes);
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
|
|
@ -415,6 +415,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
|
||||||
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
|
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
|
||||||
taosCreateMD5Hash(name, len);
|
taosCreateMD5Hash(name, len);
|
||||||
strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
|
strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
|
||||||
|
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
|
||||||
|
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,7 +455,8 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyList(pParameterList);
|
nodesDestroyList(pParameterList);
|
||||||
}
|
}
|
||||||
|
(*pMidFunc)->hasPk = pPartialFunc->hasPk;
|
||||||
|
(*pMidFunc)->pkBytes = pPartialFunc->pkBytes;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -482,7 +485,8 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyList(pParameterList);
|
nodesDestroyList(pParameterList);
|
||||||
}
|
}
|
||||||
|
(*pMergeFunc)->hasPk = pPartialFunc->hasPk;
|
||||||
|
(*pMergeFunc)->pkBytes = pPartialFunc->pkBytes;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,8 +45,8 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema
|
||||||
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
|
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
|
||||||
SArray *tagName, uint8_t tagNum, int32_t ttl);
|
SArray *tagName, uint8_t tagNum, int32_t ttl);
|
||||||
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
||||||
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues);
|
void insInitColValues(STableMeta *pTableMeta, SArray *aColValues);
|
||||||
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
|
void insCheckTableDataOrder(STableDataCxt *pTableCxt, SRowKey *rowKey);
|
||||||
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
||||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
|
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
|
||||||
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
|
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
|
||||||
|
|
|
@ -113,7 +113,8 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
|
||||||
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
||||||
SSmlKv* kv = taosArrayGet(cols, i);
|
SSmlKv* kv = taosArrayGet(cols, i);
|
||||||
|
|
||||||
if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){
|
if (kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 ||
|
||||||
|
kv->type != pTagSchema->type) {
|
||||||
code = TSDB_CODE_SML_INVALID_DATA;
|
code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
|
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -200,7 +201,9 @@ int32_t smlBuildRow(STableDataCxt* pTableCxt) {
|
||||||
if (TSDB_CODE_SUCCESS != ret) {
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
SRowKey key;
|
||||||
|
tRowGetKey(*pRow, &key);
|
||||||
|
insCheckTableDataOrder(pTableCxt, &key);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,15 +212,16 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
|
||||||
SSchema* pColSchema = schema + index;
|
SSchema* pColSchema = schema + index;
|
||||||
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
|
||||||
SSmlKv* kv = (SSmlKv*)data;
|
SSmlKv* kv = (SSmlKv*)data;
|
||||||
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
|
if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 ||
|
||||||
|
kv->type != pColSchema->type) {
|
||||||
ret = TSDB_CODE_SML_INVALID_DATA;
|
ret = TSDB_CODE_SML_INVALID_DATA;
|
||||||
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
|
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
|
||||||
if(tmp){
|
if (tmp) {
|
||||||
memcpy(tmp, kv->key, kv->keyLen);
|
memcpy(tmp, kv->key, kv->keyLen);
|
||||||
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)",
|
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name,
|
||||||
tmp, tDataTypes[kv->type].name, pColSchema->name, tDataTypes[pColSchema->type].name);
|
pColSchema->name, tDataTypes[pColSchema->type].name);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
}else{
|
} else {
|
||||||
uError("SML smlBuildCol out of memory");
|
uError("SML smlBuildCol out of memory");
|
||||||
}
|
}
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -225,7 +229,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
|
||||||
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
|
int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
|
||||||
if(size <= 0){
|
if (size <= 0) {
|
||||||
ret = TSDB_CODE_SML_INVALID_DATA;
|
ret = TSDB_CODE_SML_INVALID_DATA;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -351,7 +355,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SSmlKv* kv = *(SSmlKv**)p;
|
SSmlKv* kv = *(SSmlKv**)p;
|
||||||
if(kv->type != pColSchema->type){
|
if (kv->type != pColSchema->type) {
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
|
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -367,7 +371,8 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
}
|
}
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
if (errno == E2BIG) {
|
if (errno == E2BIG) {
|
||||||
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
|
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length,
|
||||||
|
pColSchema->bytes, kv->value);
|
||||||
buildInvalidOperationMsg(&pBuf, "value too long");
|
buildInvalidOperationMsg(&pBuf, "value too long");
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -396,7 +401,9 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
buildInvalidOperationMsg(&pBuf, "tRowBuild error");
|
buildInvalidOperationMsg(&pBuf, "tRowBuild error");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
SRowKey key;
|
||||||
|
tRowGetKey(*pRow, &key);
|
||||||
|
insCheckTableDataOrder(pTableCxt, &key);
|
||||||
clearColValArraySml(pTableCxt->pValues);
|
clearColValArraySml(pTableCxt->pValues);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,12 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "geosWrapper.h"
|
||||||
#include "parInsertUtil.h"
|
#include "parInsertUtil.h"
|
||||||
#include "parToken.h"
|
#include "parToken.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "geosWrapper.h"
|
|
||||||
|
|
||||||
typedef struct SInsertParseContext {
|
typedef struct SInsertParseContext {
|
||||||
SParseContext* pComCxt;
|
SParseContext* pComCxt;
|
||||||
|
@ -154,19 +154,15 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef enum {
|
typedef enum { BOUND_TAGS, BOUND_COLUMNS, BOUND_ALL_AND_TBNAME } EBoundColumnsType;
|
||||||
BOUND_TAGS,
|
|
||||||
BOUND_COLUMNS,
|
|
||||||
BOUND_ALL_AND_TBNAME
|
|
||||||
} EBoundColumnsType;
|
|
||||||
|
|
||||||
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
|
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
|
||||||
return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
|
return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pStmt->pSql -> field1_name, ...)
|
// pStmt->pSql -> field1_name, ...)
|
||||||
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType, STableMeta* pTableMeta,
|
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType,
|
||||||
SBoundColInfo* pBoundInfo) {
|
STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
|
||||||
SSchema* pSchema = NULL;
|
SSchema* pSchema = NULL;
|
||||||
if (boundColsType == BOUND_TAGS) {
|
if (boundColsType == BOUND_TAGS) {
|
||||||
pSchema = getTableTagSchema(pTableMeta);
|
pSchema = getTableTagSchema(pTableMeta);
|
||||||
|
@ -202,8 +198,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
|
||||||
token.z = tmpTokenBuf;
|
token.z = tmpTokenBuf;
|
||||||
token.n = strdequote(token.z);
|
token.n = strdequote(token.z);
|
||||||
|
|
||||||
if (boundColsType == BOUND_ALL_AND_TBNAME &&
|
if (boundColsType == BOUND_ALL_AND_TBNAME && token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
|
||||||
token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
|
|
||||||
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
|
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
|
||||||
pUseCols[tbnameSchemaIndex] = true;
|
pUseCols[tbnameSchemaIndex] = true;
|
||||||
++pBoundInfo->numOfBound;
|
++pBoundInfo->numOfBound;
|
||||||
|
@ -230,7 +225,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
|
||||||
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
|
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
|
||||||
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
|
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) &&!pUseCols[tbnameSchemaIndex]) {
|
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
|
||||||
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
|
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
|
||||||
}
|
}
|
||||||
taosMemoryFree(pUseCols);
|
taosMemoryFree(pUseCols);
|
||||||
|
@ -288,7 +283,8 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
|
||||||
bool firstIsTS = false, secondIsTs = false;
|
bool firstIsTS = false, secondIsTs = false;
|
||||||
const char* pTokenEnd = *end;
|
const char* pTokenEnd = *end;
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS)) {
|
if (TSDB_CODE_SUCCESS !=
|
||||||
|
parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,8 +326,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
|
||||||
if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
|
if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
}
|
} else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
|
||||||
else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
|
|
||||||
*end = pTokenEnd + i;
|
*end = pTokenEnd + i;
|
||||||
if (!firstIsTS) {
|
if (!firstIsTS) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
||||||
|
@ -362,7 +357,8 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
|
||||||
valueToken.n = len;
|
valueToken.n = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs)) {
|
if (TSDB_CODE_SUCCESS !=
|
||||||
|
parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -376,7 +372,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
||||||
}
|
}
|
||||||
ts = tempTs;
|
ts = tempTs;
|
||||||
}else {
|
} else {
|
||||||
// not support operator between tow interval, such as 2h + 3s
|
// not support operator between tow interval, such as 2h + 3s
|
||||||
if (!firstIsTS) {
|
if (!firstIsTS) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
|
||||||
|
@ -413,7 +409,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to call geosFreeBuffer(*output) later
|
// need to call geosFreeBuffer(*output) later
|
||||||
static int parseGeometry(SToken *pToken, unsigned char **output, size_t *size) {
|
static int parseGeometry(SToken* pToken, unsigned char** output, size_t* size) {
|
||||||
int32_t code = TSDB_CODE_FAILED;
|
int32_t code = TSDB_CODE_FAILED;
|
||||||
|
|
||||||
//[ToDo] support to parse WKB as well as WKT
|
//[ToDo] support to parse WKB as well as WKT
|
||||||
|
@ -432,19 +428,19 @@ static int parseGeometry(SToken *pToken, unsigned char **output, size_t *size) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData, int32_t bytes){
|
static int32_t parseVarbinary(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
|
||||||
if(pToken->type != TK_NK_STRING){
|
if (pToken->type != TK_NK_STRING) {
|
||||||
return TSDB_CODE_PAR_INVALID_VARBINARY;
|
return TSDB_CODE_PAR_INVALID_VARBINARY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(isHex(pToken->z + 1, pToken->n - 2)){
|
if (isHex(pToken->z + 1, pToken->n - 2)) {
|
||||||
if(!isValidateHex(pToken->z + 1, pToken->n - 2)){
|
if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
|
||||||
return TSDB_CODE_PAR_INVALID_VARBINARY;
|
return TSDB_CODE_PAR_INVALID_VARBINARY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* data = NULL;
|
void* data = NULL;
|
||||||
uint32_t size = 0;
|
uint32_t size = 0;
|
||||||
if(taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0){
|
if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,7 +450,7 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
|
||||||
}
|
}
|
||||||
*pData = data;
|
*pData = data;
|
||||||
*nData = size;
|
*nData = size;
|
||||||
}else{
|
} else {
|
||||||
*pData = taosMemoryCalloc(1, pToken->n);
|
*pData = taosMemoryCalloc(1, pToken->n);
|
||||||
int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
|
int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
|
||||||
*nData = len;
|
*nData = len;
|
||||||
|
@ -633,7 +629,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_VARBINARY: {
|
case TSDB_DATA_TYPE_VARBINARY: {
|
||||||
code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
|
code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
|
return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -759,8 +755,8 @@ static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* p
|
||||||
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
|
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
|
||||||
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
|
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
|
||||||
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
|
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
|
||||||
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
|
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
|
||||||
pToken->type != TK_NK_BIN && pToken->type != TK_NK_VARIABLE) ||
|
pToken->type != TK_NK_VARIABLE) ||
|
||||||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
|
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
||||||
}
|
}
|
||||||
|
@ -1114,8 +1110,8 @@ static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta,
|
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, bool* pMissCache,
|
||||||
bool* pMissCache, bool bUsingTable) {
|
bool bUsingTable) {
|
||||||
SParseContext* pComCxt = pCxt->pComCxt;
|
SParseContext* pComCxt = pCxt->pComCxt;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pComCxt->async) {
|
if (pComCxt->async) {
|
||||||
|
@ -1365,13 +1361,11 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOp
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
|
||||||
}
|
}
|
||||||
// pStmt->pSql -> field1_name, ...)
|
// pStmt->pSql -> field1_name, ...)
|
||||||
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta,
|
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
|
||||||
&pTableCxt->boundColsInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != pStmt->pBoundCols) {
|
if (NULL != pStmt->pBoundCols) {
|
||||||
return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta,
|
return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
|
||||||
&pTableCxt->boundColsInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1695,18 +1689,19 @@ typedef struct SStbRowsDataContext {
|
||||||
bool isJsonTag;
|
bool isJsonTag;
|
||||||
} SStbRowsDataContext;
|
} SStbRowsDataContext;
|
||||||
|
|
||||||
typedef union SRowsDataContext{
|
typedef union SRowsDataContext {
|
||||||
STableDataCxt* pTableDataCxt;
|
STableDataCxt* pTableDataCxt;
|
||||||
SStbRowsDataContext* pStbRowsCxt;
|
SStbRowsDataContext* pStbRowsCxt;
|
||||||
} SRowsDataContext;
|
} SRowsDataContext;
|
||||||
|
|
||||||
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) {
|
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken,
|
||||||
|
bool* pFoundCtbName) {
|
||||||
*pFoundCtbName = false;
|
*pFoundCtbName = false;
|
||||||
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
|
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
|
||||||
if (TK_NK_VARIABLE == pToken->type) {
|
if (TK_NK_VARIABLE == pToken->type) {
|
||||||
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
|
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS){
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
|
if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
|
||||||
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value");
|
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value");
|
||||||
}
|
}
|
||||||
|
@ -1733,9 +1728,8 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
||||||
SStbRowsDataContext* pStbRowsCxt, bool ctbFirst,
|
SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, const SToken* tagTokens,
|
||||||
const SToken* tagTokens, SSchema* const* tagSchemas,
|
SSchema* const* tagSchemas, int numOfTagTokens) {
|
||||||
int numOfTagTokens) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
|
uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
|
||||||
|
|
||||||
|
@ -1749,8 +1743,8 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals,
|
code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames,
|
||||||
&pStbRowsCxt->pTag);
|
pStbRowsCxt->aTagVals, &pStbRowsCxt->pTag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
|
if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
|
||||||
|
@ -1765,9 +1759,9 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||||
SStbRowsDataContext* pStbRowsCxt, SToken* pToken,
|
SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
|
||||||
const SBoundColInfo* pCols, const SSchema* pSchemas,
|
const SSchema* pSchemas, SToken* tagTokens, SSchema** tagSchemas, int* pNumOfTagTokens,
|
||||||
SToken* tagTokens, SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName) {
|
bool* bFoundTbName) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SArray* pTagNames = pStbRowsCxt->aTagNames;
|
SArray* pTagNames = pStbRowsCxt->aTagNames;
|
||||||
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
||||||
|
@ -1808,11 +1802,11 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
||||||
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
|
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals, &pStbRowsCxt->pTag);
|
code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals,
|
||||||
|
&pStbRowsCxt->pTag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} else if (pCols->pColIndex[i] == tbnameIdx) {
|
||||||
else if (pCols->pColIndex[i] == tbnameIdx) {
|
|
||||||
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName);
|
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1827,8 +1821,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
|
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, bool* pCtbFirst) {
|
||||||
SToken* pToken, bool *pCtbFirst) {
|
|
||||||
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
|
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
|
||||||
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
|
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
|
||||||
|
|
||||||
|
@ -1840,8 +1833,8 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
|
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
|
||||||
int numOfTagTokens = 0;
|
int numOfTagTokens = 0;
|
||||||
|
|
||||||
code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens,
|
code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens, tagSchemas,
|
||||||
tagSchemas, &numOfTagTokens, &bFoundTbName);
|
&numOfTagTokens, &bFoundTbName);
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !bFoundTbName) {
|
if (code == TSDB_CODE_SUCCESS && !bFoundTbName) {
|
||||||
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
|
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
|
||||||
|
@ -1870,7 +1863,8 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) {
|
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
||||||
|
SStbRowsDataContext* pStbRowsCxt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||||
|
@ -1878,9 +1872,9 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid,
|
insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
|
||||||
pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta),
|
pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
|
||||||
TSDB_DEFAULT_TABLE_TTL);
|
getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
|
||||||
pStbRowsCxt->pTag = NULL;
|
pStbRowsCxt->pTag = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1907,7 +1901,6 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
||||||
if (pStbRowsCxt == NULL) return;
|
if (pStbRowsCxt == NULL) return;
|
||||||
|
|
||||||
|
@ -1948,7 +1941,9 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
||||||
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
|
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
|
||||||
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
|
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
insCheckTableDataOrder(*ppTableDataCxt, TD_ROW_KEY(*pRow));
|
SRowKey key;
|
||||||
|
tRowGetKey(*pRow, &key);
|
||||||
|
insCheckTableDataOrder(*ppTableDataCxt, &key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1961,7 +1956,8 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) {
|
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
|
||||||
|
SToken* pToken) {
|
||||||
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
|
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
|
||||||
bool isParseBindParam = false;
|
bool isParseBindParam = false;
|
||||||
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
|
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
|
||||||
|
@ -2014,7 +2010,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
|
||||||
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
SRowKey key;
|
||||||
|
tRowGetKey(*pRow, &key);
|
||||||
|
insCheckTableDataOrder(pTableCxt, &key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2112,7 +2110,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
||||||
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
|
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
|
||||||
} else {
|
} else {
|
||||||
STableDataCxt* pTableDataCxt = NULL;
|
STableDataCxt* pTableDataCxt = NULL;
|
||||||
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
|
code =
|
||||||
|
parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
|
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
|
||||||
void* pData = pTableDataCxt;
|
void* pData = pTableDataCxt;
|
||||||
|
@ -2149,11 +2148,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
|
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
||||||
|
SRowsDataContext rowsDataCxt) {
|
||||||
// init only for file
|
// init only for file
|
||||||
if (NULL == pStmt->pTableCxtHashObj) {
|
if (NULL == pStmt->pTableCxtHashObj) {
|
||||||
pStmt->pTableCxtHashObj =
|
pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
|
||||||
}
|
}
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
||||||
|
@ -2285,7 +2284,8 @@ static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDa
|
||||||
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (!pStmt->pBoundCols) {
|
if (!pStmt->pBoundCols) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql);
|
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion",
|
||||||
|
pStmt->pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStbRowsDataContext* pStbRowsCxt = NULL;
|
SStbRowsDataContext* pStbRowsCxt = NULL;
|
||||||
|
@ -2297,7 +2297,7 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
|
||||||
pStbRowsCxt->hasTimestampTag = false;
|
pStbRowsCxt->hasTimestampTag = false;
|
||||||
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
|
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
|
||||||
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
|
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
|
||||||
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta) ) {
|
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta)) {
|
||||||
if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
pStbRowsCxt->hasTimestampTag = true;
|
pStbRowsCxt->hasTimestampTag = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,9 +170,7 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) {
|
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { initColValues(pTableMeta, aColValues); }
|
||||||
initColValues(pTableMeta, aColValues);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
||||||
pInfo->numOfCols = numOfBound;
|
pInfo->numOfCols = numOfBound;
|
||||||
|
@ -187,21 +185,22 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
|
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
|
||||||
// once the data block is disordered, we do NOT keep last timestamp any more
|
// once the data block is disordered, we do NOT keep last timestamp any more
|
||||||
if (!pTableCxt->ordered) {
|
if (!pTableCxt->ordered) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsKey < pTableCxt->lastTs) {
|
if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
|
||||||
pTableCxt->ordered = false;
|
pTableCxt->ordered = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsKey == pTableCxt->lastTs) {
|
if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
|
||||||
pTableCxt->duplicateTs = true;
|
pTableCxt->duplicateTs = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableCxt->lastTs = tsKey;
|
// TODO: for variable length data type, we need to copy it out
|
||||||
|
pTableCxt->lastKey = *rowKey;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,7 +216,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
pTableCxt->lastTs = 0;
|
pTableCxt->lastKey = (SRowKey){0};
|
||||||
pTableCxt->ordered = true;
|
pTableCxt->ordered = true;
|
||||||
pTableCxt->duplicateTs = false;
|
pTableCxt->duplicateTs = false;
|
||||||
|
|
||||||
|
@ -254,7 +253,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
||||||
pTableCxt->pData->uid = pTableMeta->uid;
|
pTableCxt->pData->uid = pTableMeta->uid;
|
||||||
pTableCxt->pData->sver = pTableMeta->sversion;
|
pTableCxt->pData->sver = pTableMeta->sversion;
|
||||||
pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
|
pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
|
||||||
if(pCreateTbReq != NULL) *pCreateTbReq = NULL;
|
if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
|
||||||
if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
|
pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
|
||||||
if (NULL == pTableCxt->pData->aCol) {
|
if (NULL == pTableCxt->pData->aCol) {
|
||||||
|
@ -317,7 +316,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void resetColValues(SArray* pValues) {
|
static void resetColValues(SArray* pValues) {
|
||||||
int32_t num = taosArrayGetSize(pValues);
|
int32_t num = taosArrayGetSize(pValues);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
@ -463,7 +461,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
|
||||||
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
|
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
taosArrayPush(pVgroupList, &pVgCxt);
|
taosArrayPush(pVgroupList, &pVgCxt);
|
||||||
// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
|
// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
|
||||||
*pOutput = pVgCxt;
|
*pOutput = pVgCxt;
|
||||||
} else {
|
} else {
|
||||||
insDestroyVgroupDataCxt(pVgCxt);
|
insDestroyVgroupDataCxt(pVgCxt);
|
||||||
|
@ -613,7 +611,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
|
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
|
||||||
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
||||||
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
|
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
|
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
|
||||||
|
@ -634,7 +632,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
|
|
||||||
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
|
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
|
||||||
for (int i = 0; i < numFields; i++) {
|
for (int i = 0; i < numFields; i++) {
|
||||||
if(strcmp(pSchema->name, fields[i].name) == 0){
|
if (strcmp(pSchema->name, fields[i].name) == 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -644,7 +642,8 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
|
||||||
|
|
||||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
|
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
|
||||||
int numFields, bool needChangeLength) {
|
int numFields, bool needChangeLength) {
|
||||||
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
|
void* tmp =
|
||||||
|
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
||||||
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
|
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
|
||||||
|
@ -654,7 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
|
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
|
||||||
if(tmp == NULL){
|
if (tmp == NULL) {
|
||||||
ret = initTableColSubmitData(pTableCxt);
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
uError("initTableColSubmitData error");
|
uError("initTableColSubmitData error");
|
||||||
|
@ -663,8 +662,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = (char*)data;
|
char* p = (char*)data;
|
||||||
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each column
|
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
|
||||||
// length |
|
// column length |
|
||||||
int32_t version = *(int32_t*)data;
|
int32_t version = *(int32_t*)data;
|
||||||
p += sizeof(int32_t);
|
p += sizeof(int32_t);
|
||||||
p += sizeof(int32_t);
|
p += sizeof(int32_t);
|
||||||
|
@ -699,8 +698,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
ret = TSDB_CODE_INVALID_PARA;
|
ret = TSDB_CODE_INVALID_PARA;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if(tFields == NULL){
|
if (tFields == NULL) {
|
||||||
for (int j = 0; j < boundInfo->numOfBound; j++){
|
for (int j = 0; j < boundInfo->numOfBound; j++) {
|
||||||
SSchema* pColSchema = &pSchema[j];
|
SSchema* pColSchema = &pSchema[j];
|
||||||
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
|
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
|
||||||
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||||
|
@ -717,7 +716,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
}
|
}
|
||||||
char* pData = pStart;
|
char* pData = pStart;
|
||||||
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
||||||
if(ret != 0){
|
if (ret != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||||
|
@ -727,11 +726,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
pStart += colLength[j];
|
pStart += colLength[j];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
for (int i = 0; i < numFields; i++) {
|
for (int i = 0; i < numFields; i++) {
|
||||||
for (int j = 0; j < boundInfo->numOfBound; j++){
|
for (int j = 0; j < boundInfo->numOfBound; j++) {
|
||||||
SSchema* pColSchema = &pSchema[j];
|
SSchema* pColSchema = &pSchema[j];
|
||||||
if(strcmp(pColSchema->name, tFields[i].name) == 0){
|
if (strcmp(pColSchema->name, tFields[i].name) == 0) {
|
||||||
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||||
uError("type or bytes not equal");
|
uError("type or bytes not equal");
|
||||||
ret = TSDB_CODE_INVALID_PARA;
|
ret = TSDB_CODE_INVALID_PARA;
|
||||||
|
@ -748,7 +747,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
|
|
||||||
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
|
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
|
||||||
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
||||||
if(ret != 0){
|
if (ret != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||||
|
@ -761,17 +760,16 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||||
if( boundInfo->pColIndex[c] != -1){
|
if (boundInfo->pColIndex[c] != -1) {
|
||||||
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
|
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
|
||||||
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
|
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
|
||||||
if(ret != 0){
|
if (ret != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
boundInfo->pColIndex[c] = c; // restore for next block
|
boundInfo->pColIndex[c] = c; // restore for next block
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -690,19 +690,23 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
|
||||||
|
SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||||
|
if (NULL == pOrderByExpr) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pOrderByExpr->pExpr = nodesCloneNode(pExpr);
|
||||||
|
if (NULL == pOrderByExpr->pExpr) {
|
||||||
|
nodesDestroyNode((SNode*)pOrderByExpr);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pOrderByExpr->order = order;
|
||||||
|
pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
|
||||||
|
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
|
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
|
||||||
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
|
||||||
if (NULL == pMergeKey) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
|
|
||||||
if (NULL == pMergeKey->pExpr) {
|
|
||||||
nodesDestroyNode((SNode*)pMergeKey);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
pMergeKey->order = order;
|
|
||||||
pMergeKey->nullOrder = NULL_ORDER_FIRST;
|
|
||||||
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
|
@ -1357,6 +1361,28 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
|
||||||
return pCol;
|
return pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SNode* stbSplFindPkFromScan(SScanLogicNode* pScan) {
|
||||||
|
bool find = false;
|
||||||
|
SNode* pCol = NULL;
|
||||||
|
FOREACH(pCol, pScan->pScanCols) {
|
||||||
|
if (((SColumnNode*)pCol)->isPk) {
|
||||||
|
find = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!find) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SNode* pTarget = NULL;
|
||||||
|
FOREACH(pTarget, pScan->node.pTargets) {
|
||||||
|
if (nodesEqualNode(pTarget, pCol)) {
|
||||||
|
return pCol;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol));
|
||||||
|
return pCol;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
|
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
|
||||||
SNodeList** pOutputMergeKeys) {
|
SNodeList** pOutputMergeKeys) {
|
||||||
SNodeList* pChildren = pScan->node.pChildren;
|
SNodeList* pChildren = pScan->node.pChildren;
|
||||||
|
@ -1374,8 +1400,13 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu
|
||||||
pMergeScan->filesetDelimited = true;
|
pMergeScan->filesetDelimited = true;
|
||||||
pMergeScan->node.pChildren = pChildren;
|
pMergeScan->node.pChildren = pChildren;
|
||||||
splSetParent((SLogicNode*)pMergeScan);
|
splSetParent((SLogicNode*)pMergeScan);
|
||||||
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
|
|
||||||
pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
SNode* pTs = stbSplFindPrimaryKeyFromScan(pMergeScan);
|
||||||
|
code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||||
|
SNode* pPk = stbSplFindPkFromScan(pMergeScan);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
|
||||||
|
code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
Loading…
Reference in New Issue