Merge branch '3.0' into feat/TD-30813-2
This commit is contained in:
commit
1cfb58e7fe
|
@ -934,7 +934,8 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
pIter->cv = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
||||
goto _exit;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
uError("unexpected column id %d, %d", cid, pTColumn->colId);
|
||||
goto _exit;
|
||||
}
|
||||
} else {
|
||||
pIter->cv = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
|
||||
|
@ -1356,11 +1357,8 @@ int32_t tValueCompare(const SValue *tv1, const SValue *tv2) {
|
|||
int32_t ret = memcmp(tv1->pData, tv2->pData, tv1->nData < tv2->nData ? tv1->nData : tv2->nData);
|
||||
return ret ? ret : (tv1->nData < tv2->nData ? -1 : (tv1->nData > tv2->nData ? 1 : 0));
|
||||
}
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
ASSERT(0);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -2600,7 +2598,7 @@ static FORCE_INLINE void tColDataGetValue3(SColData *pColData, int32_t iVal,
|
|||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_VALUE
|
||||
|
@ -2628,7 +2626,7 @@ static FORCE_INLINE void tColDataGetValue5(SColData *pColData, int32_t iVal,
|
|||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue6(SColData *pColData, int32_t iVal,
|
||||
|
@ -2641,7 +2639,7 @@ static FORCE_INLINE void tColDataGetValue6(SColData *pColData, int32_t iVal,
|
|||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue7(SColData *pColData, int32_t iVal,
|
||||
|
@ -2657,7 +2655,7 @@ static FORCE_INLINE void tColDataGetValue7(SColData *pColData, int32_t iVal,
|
|||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal *pColVal) = {
|
||||
|
@ -2671,7 +2669,6 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal
|
|||
tColDataGetValue7 // HAS_VALUE | HAS_NULL | HAS_NONE
|
||||
};
|
||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
|
||||
ASSERT(iVal >= 0 && iVal < pColData->nVal && pColData->flag);
|
||||
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
||||
}
|
||||
|
||||
|
@ -3526,7 +3523,8 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /
|
|||
} else if (bv == BIT_FLG_NULL) {
|
||||
flag |= HAS_NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
uError("invalid bit value:%d", bv);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flag == pColData->flag) break;
|
||||
|
|
|
@ -397,11 +397,6 @@ char getPrecisionUnit(int32_t precision) {
|
|||
}
|
||||
|
||||
int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPrecision) {
|
||||
ASSERT(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
ASSERT(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
toPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
|
||||
switch (fromPrecision) {
|
||||
case TSDB_TIME_PRECISION_MILLI: {
|
||||
switch (toPrecision) {
|
||||
|
@ -418,7 +413,6 @@ int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPre
|
|||
}
|
||||
return utime * 1000000;
|
||||
default:
|
||||
ASSERT(0);
|
||||
return utime;
|
||||
}
|
||||
} // end from milli
|
||||
|
@ -434,7 +428,6 @@ int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPre
|
|||
}
|
||||
return utime * 1000;
|
||||
default:
|
||||
ASSERT(0);
|
||||
return utime;
|
||||
}
|
||||
} // end from micro
|
||||
|
@ -447,12 +440,10 @@ int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPre
|
|||
case TSDB_TIME_PRECISION_NANO:
|
||||
return utime;
|
||||
default:
|
||||
ASSERT(0);
|
||||
return utime;
|
||||
}
|
||||
} // end from nano
|
||||
default: {
|
||||
ASSERT(0);
|
||||
return utime; // only to pass windows compilation
|
||||
}
|
||||
} // end switch fromPrecision
|
||||
|
@ -463,10 +454,6 @@ int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPre
|
|||
// !!!!notice:there are precision problems, double lose precison if time is too large, for example:
|
||||
// 1626006833631000000*1.0 = double = 1626006833631000064
|
||||
// int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
||||
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
// fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
// toPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
// static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
|
||||
// ((double)time * factors[fromPrecision][toPrecision]);
|
||||
//}
|
||||
|
@ -783,7 +770,6 @@ int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interva
|
|||
|
||||
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
||||
if (pInterval->sliding == 0) {
|
||||
ASSERT(pInterval->interval == 0);
|
||||
return ts;
|
||||
}
|
||||
|
||||
|
@ -814,7 +800,6 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
} else {
|
||||
if (IS_CALENDAR_TIME_DURATION(pInterval->intervalUnit)) {
|
||||
int64_t news = (ts / pInterval->sliding) * pInterval->sliding;
|
||||
ASSERT(news <= ts);
|
||||
if (pInterval->slidingUnit == 'd' || pInterval->slidingUnit == 'w') {
|
||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||
int64_t timezone = _timezone;
|
||||
|
@ -887,8 +872,6 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pInterval->offset >= 0);
|
||||
|
||||
if (pInterval->offset > 0) {
|
||||
// try to move current window to the left-hande-side, due to the offset effect.
|
||||
int64_t newe = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
|
@ -1284,7 +1267,6 @@ static int32_t parseTsFormat(const char* formatStr, SArray* formats) {
|
|||
}
|
||||
if (last) {
|
||||
// expand
|
||||
assert(last->type == TS_FORMAT_NODE_TYPE_CHAR);
|
||||
last->len++;
|
||||
formatStr++;
|
||||
} else {
|
||||
|
@ -1311,7 +1293,6 @@ static int32_t parseTsFormat(const char* formatStr, SArray* formats) {
|
|||
}
|
||||
}
|
||||
if (lastOtherFormat) {
|
||||
assert(lastOtherFormat->type == TS_FORMAT_NODE_TYPE_CHAR);
|
||||
lastOtherFormat->len++;
|
||||
formatStr++;
|
||||
} else {
|
||||
|
@ -1664,7 +1645,6 @@ static int32_t char2ts(const char* s, SArray* formats, int64_t* ts, int32_t prec
|
|||
}
|
||||
continue;
|
||||
}
|
||||
assert(node->type == TS_FORMAT_NODE_TYPE_KEYWORD);
|
||||
switch (node->key->id) {
|
||||
case TSFKW_A_M:
|
||||
case TSFKW_P_M:
|
||||
|
@ -1929,7 +1909,7 @@ static int32_t char2ts(const char* s, SArray* formats, int64_t* ts, int32_t prec
|
|||
int32_t taosTs2Char(const char* format, SArray** formats, int64_t ts, int32_t precision, char* out, int32_t outLen) {
|
||||
if (!*formats) {
|
||||
*formats = taosArrayInit(8, sizeof(TSFormatNode));
|
||||
if (!*formats){
|
||||
if (!*formats) {
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
TAOS_CHECK_RETURN(parseTsFormat(format, *formats));
|
||||
|
@ -2002,7 +1982,7 @@ static int8_t UNIT_INDEX[26] = {/*a*/ 2, 0, -1, 6, -1, -1, -1,
|
|||
/*o*/ -1, -1, -1, -1, 3, -1,
|
||||
/*u*/ 1, -1, 7, -1, 9, -1};
|
||||
|
||||
#define GET_UNIT_INDEX(idx) UNIT_INDEX[(idx) - 97]
|
||||
#define GET_UNIT_INDEX(idx) UNIT_INDEX[(idx)-97]
|
||||
|
||||
// clang-format off
|
||||
static int64_t UNIT_MATRIX[10][11] = { /* ns, us, ms, s, min, h, d, w, month, y*/
|
||||
|
|
|
@ -3147,7 +3147,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
|
|||
if (TSDB_CODE_MND_USER_NOT_EXIST == code) {
|
||||
SGetUserAuthRsp rsp = {.dropped = 1};
|
||||
(void)memcpy(rsp.user, pUsers[i].user, TSDB_USER_LEN);
|
||||
(void)taosArrayPush(batchRsp.pArray, &rsp);
|
||||
TSDB_CHECK_NULL(taosArrayPush(batchRsp.pArray, &rsp), code, lino, _OVER, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
mError("user:%s, failed to auth user since %s", pUsers[i].user, tstrerror(code));
|
||||
code = 0;
|
||||
|
@ -3168,7 +3168,12 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
|
|||
TAOS_CHECK_GOTO(code, &lino, _OVER);
|
||||
}
|
||||
|
||||
(void)taosArrayPush(batchRsp.pArray, &rsp);
|
||||
if (!(taosArrayPush(batchRsp.pArray, &rsp))) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
tFreeSGetUserAuthRsp(&rsp);
|
||||
TAOS_CHECK_GOTO(code, &lino, _OVER);
|
||||
}
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
}
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT};
|
||||
(void)taosArrayPush(pRSmaStat->blocks, &datablock);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRSmaStat->blocks, &datablock), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
// init smaMgmt
|
||||
TAOS_CHECK_GOTO(smaInit(), &lino, _exit);
|
||||
|
|
|
@ -1430,7 +1430,7 @@ static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
|||
blockDataDestroy(packData->pDataBlock);
|
||||
}
|
||||
} else {
|
||||
ASSERTS(0, "unknown type:%d", type);
|
||||
smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
|
||||
}
|
||||
taosArrayClear(pItems);
|
||||
}
|
||||
|
@ -1540,14 +1540,13 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
|||
++nDelete;
|
||||
}
|
||||
} else {
|
||||
ASSERTS(0, "unknown msg type:%d", inputType);
|
||||
smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (nSubmit > 0 || nDelete > 0) {
|
||||
int32_t size = TARRAY_SIZE(pSubmitArr);
|
||||
ASSERTS(size > 0, "size is %d", size);
|
||||
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
|
||||
|
@ -1677,7 +1676,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
ASSERTS(0, "unknown rsma exec type:%d", (int32_t)type);
|
||||
smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
|
|
@ -1036,7 +1036,9 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
|
|||
|
||||
code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, bData->suid != 0 ? bData->suid : bData->uid,
|
||||
&cmprInfo.pColCmpr);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
if (code) {
|
||||
tsdbWarn("vgId:%d failed to get column compress algrithm", TD_VID(writer->config->tsdb->pVnode));
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tBlockDataCompress(bData, &cmprInfo, buffers, assist), &lino, _exit);
|
||||
|
||||
|
|
|
@ -319,7 +319,7 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
} else { // all blocks are qualified
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
|
||||
if (px == NULL){
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
@ -336,7 +336,7 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
}
|
||||
|
||||
if (p->suid == suid) {
|
||||
void* px = taosArrayPush(pTmp, p);
|
||||
void *px = taosArrayPush(pTmp, p);
|
||||
if (px == NULL) {
|
||||
code = terrno;
|
||||
break;
|
||||
|
@ -372,7 +372,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
void* px = NULL;
|
||||
void *px = NULL;
|
||||
int32_t startIndex = 0;
|
||||
|
||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||
|
@ -415,7 +415,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
|
||||
// existed
|
||||
if (i < rows) {
|
||||
SSttTableRowsInfo* pInfo = &pBlockLoadInfo->info;
|
||||
SSttTableRowsInfo *pInfo = &pBlockLoadInfo->info;
|
||||
|
||||
if (pInfo->pUid == NULL) {
|
||||
pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
|
||||
|
@ -530,7 +530,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
_end:
|
||||
(void)tStatisBlockDestroy(&block);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
@ -672,7 +672,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
|||
}
|
||||
|
||||
void tLDataIterClose2(SLDataIter *pIter) {
|
||||
(void) tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
(void)tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
pIter->pReader = NULL;
|
||||
}
|
||||
|
||||
|
@ -826,7 +826,7 @@ static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) {
|
||||
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool *hasNext) {
|
||||
int32_t step = pIter->backward ? -1 : 1;
|
||||
int32_t code = 0;
|
||||
int32_t iBlockL = pIter->iSttBlk;
|
||||
|
@ -1020,7 +1020,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
|
||||
// let's record the time window for current table of uid in the stt files
|
||||
if (pSttDataInfo != NULL && numOfRows > 0) {
|
||||
void* px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
|
||||
void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -1041,7 +1041,7 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { (void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||
|
||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
|
||||
|
||||
|
@ -1050,14 +1050,12 @@ static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
|
|||
|
||||
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
|
||||
pInfo->blockData[0].pin = true;
|
||||
ASSERT(!pInfo->blockData[1].pin);
|
||||
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
|
||||
pInfo->blockData[1].pin = true;
|
||||
ASSERT(!pInfo->blockData[0].pin);
|
||||
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
|
||||
return;
|
||||
}
|
||||
|
@ -1068,14 +1066,12 @@ static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
|
|||
static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) {
|
||||
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
|
||||
if (pInfo->blockData[0].pin) {
|
||||
ASSERT(!pInfo->blockData[1].pin);
|
||||
pInfo->blockData[0].pin = false;
|
||||
tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[0].sttBlockIndex, pIter->cid, id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pInfo->blockData[1].pin) {
|
||||
ASSERT(!pInfo->blockData[0].pin);
|
||||
pInfo->blockData[1].pin = false;
|
||||
tsdbTrace("unpin stt-block:%d, stt-fileVer:%" PRId64 " %s", pInfo->blockData[1].sttBlockIndex, pIter->cid, id);
|
||||
return;
|
||||
|
|
|
@ -126,7 +126,7 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint6
|
|||
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
|
||||
int32_t* num);
|
||||
uint64_t tableListGetSize(const STableListInfo* pTableList);
|
||||
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes);
|
||||
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
||||
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
||||
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
||||
|
|
|
@ -176,7 +176,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t totalTables = tableListGetSize(pTableListInfo);
|
||||
int32_t totalTables = 0;
|
||||
code = tableListGetSize(pTableListInfo, &totalTables);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t capacity = 0;
|
||||
|
||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||
|
@ -271,7 +274,10 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
SSDataBlock* pBufRes = pInfo->pBufferedRes;
|
||||
|
||||
uint64_t suid = tableListGetSuid(pTableList);
|
||||
int32_t size = tableListGetSize(pTableList);
|
||||
int32_t size = 0;
|
||||
code = tableListGetSize(pTableList, &size);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (size == 0) {
|
||||
setOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
|
|
@ -894,12 +894,12 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
|
|||
|
||||
int32_t rawLen = *(int32_t*)pStart;
|
||||
pStart += sizeof(int32_t);
|
||||
ASSERT(compLen <= rawLen && compLen != 0);
|
||||
QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
pNextStart = pStart + compLen;
|
||||
if (pRetrieveRsp->compressed && (compLen < rawLen)) {
|
||||
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
|
||||
ASSERT(t == rawLen);
|
||||
QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
pStart = pDataInfo->decompBuf;
|
||||
}
|
||||
|
||||
|
|
|
@ -197,7 +197,6 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
|
|||
pGroupResInfo->freeItem = true;
|
||||
pGroupResInfo->pRows = pArrayList;
|
||||
pGroupResInfo->index = 0;
|
||||
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
||||
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
|
||||
|
@ -1560,7 +1559,12 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
|
|||
return code;
|
||||
}
|
||||
|
||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
||||
if (nodeType(pNew) != QUERY_NODE_VALUE) {
|
||||
nodesDestroyList(groupNew);
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
SValueNode* pValue = (SValueNode*)pNew;
|
||||
|
||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
|
||||
|
@ -1879,7 +1883,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
|
||||
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -2149,7 +2154,8 @@ int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SAr
|
|||
} else if (p->info.colId < pmInfo->colId) {
|
||||
i++;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -2383,9 +2389,13 @@ void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
|
|||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||
}
|
||||
|
||||
uint64_t tableListGetSize(const STableListInfo* pTableList) {
|
||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
||||
return taosArrayGetSize(pTableList->pTableList);
|
||||
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
|
||||
if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
(*pRes) = taosArrayGetSize(pTableList->pTableList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
|
||||
|
@ -2430,7 +2440,6 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tab
|
|||
}
|
||||
|
||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
||||
ASSERT(pKeyInfo->uid == tableUid);
|
||||
|
||||
return pKeyInfo->groupId;
|
||||
}
|
||||
|
@ -2457,7 +2466,8 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
|
|||
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
ASSERT(code != TSDB_CODE_DUP_KEY); // we have checked the existence of uid in hash map above
|
||||
// we have checked the existence of uid in hash map above
|
||||
QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
taosArrayPopTailBatch(pTableList->pTableList, 1); // let's pop the last element in the array list
|
||||
}
|
||||
|
||||
|
@ -2474,7 +2484,12 @@ _end:
|
|||
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
|
||||
int32_t* size) {
|
||||
int32_t totalGroups = tableListGetOutputGroups(pTableList);
|
||||
int32_t numOfTables = tableListGetSize(pTableList);
|
||||
int32_t numOfTables = 0;
|
||||
int32_t code = tableListGetSize(pTableList, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
|
|
@ -1295,7 +1295,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
// this value may be changed if new tables are created
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (uid == 0) {
|
||||
if (numOfTables != 0) {
|
||||
|
@ -1439,7 +1445,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
tDeleteSchemaWrapper(mtInfo.schema);
|
||||
return code;
|
||||
}
|
||||
int32_t size = tableListGetSize(pTableListInfo);
|
||||
int32_t size = 0;
|
||||
code = tableListGetSize(pTableListInfo, &size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
tDeleteSchemaWrapper(mtInfo.schema);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
|
||||
NULL, (void**)&pInfo->dataReader, NULL, NULL);
|
||||
|
@ -1520,7 +1532,10 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
|||
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
|
||||
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
|
||||
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
|
||||
|
|
|
@ -1115,7 +1115,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
|
|||
pDeleterParam->suid = tableListGetSuid(pTableListInfo);
|
||||
|
||||
// TODO extract uid list
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
taosMemoryFree(pDeleterParam);
|
||||
return code;
|
||||
}
|
||||
|
||||
pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
|
||||
if (NULL == pDeleterParam->pUidList) {
|
||||
taosMemoryFree(pDeleterParam);
|
||||
|
|
|
@ -415,7 +415,8 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN
|
|||
int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
taosMemoryFreeClear(pExpr);
|
||||
pExprSupp->numOfExprs += 1;
|
||||
pExprSupp->pExprInfo = pExpr;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -447,6 +448,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
|
||||
|
@ -511,7 +513,6 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
}
|
||||
|
||||
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
|
||||
optrDefaultGetNextExtFn, NULL);
|
||||
|
||||
|
|
|
@ -213,7 +213,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
|||
memcpy(pkey->pData, val, dataLen);
|
||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||
memcpy(pkey->pData, val, varDataTLen(val));
|
||||
ASSERT(varDataTLen(val) <= pkey->bytes);
|
||||
} else {
|
||||
memcpy(pkey->pData, val, pkey->bytes);
|
||||
}
|
||||
|
@ -241,7 +240,6 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
|||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||
varDataCopy(pStart, pkey->pData);
|
||||
pStart += varDataTLen(pkey->pData);
|
||||
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
||||
} else {
|
||||
memcpy(pStart, pkey->pData, pkey->bytes);
|
||||
pStart += pkey->bytes;
|
||||
|
@ -740,7 +738,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
memcpy(data + (*columnLen), src, dataLen);
|
||||
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
|
||||
ASSERT(v > 0);
|
||||
QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
contentLen = dataLen;
|
||||
} else {
|
||||
|
@ -748,7 +746,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
char* src = colDataGetData(pColInfoData, j);
|
||||
memcpy(data + (*columnLen), src, varDataTLen(src));
|
||||
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
||||
ASSERT(v > 0);
|
||||
QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
contentLen = varDataTLen(src);
|
||||
}
|
||||
|
@ -762,7 +760,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
colDataSetNull_f(bitmap, (*rows));
|
||||
} else {
|
||||
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
|
||||
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
|
||||
QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
|
||||
lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
}
|
||||
contentLen = bytes;
|
||||
}
|
||||
|
@ -1299,7 +1298,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
|
||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pDest = pInfo->binfo.pRes;
|
||||
ASSERT(hasRemainPartion(pInfo));
|
||||
QUERY_CHECK_CONDITION((hasRemainPartion(pInfo)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
|
||||
blockDataCleanup(pDest);
|
||||
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
|
||||
|
@ -1343,7 +1342,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
pDest->info.id.groupId = pParInfo->groupId;
|
||||
pOperator->resultInfo.totalRows += pDest->info.rows;
|
||||
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
|
||||
ASSERT(pDest->info.rows > 0);
|
||||
QUERY_CHECK_CONDITION((pDest->info.rows > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1549,7 +1548,8 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock**
|
|||
return code;
|
||||
}
|
||||
default:
|
||||
ASSERTS(0, "invalid SSDataBlock type");
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
|
|
|
@ -1083,7 +1083,10 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
return getBlockForEmptyTable(pOperator, pStart);
|
||||
}
|
||||
} else { // group by tag + no sort
|
||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
|
||||
// get empty group, mark processed & rm from hash
|
||||
void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL);
|
||||
|
@ -1172,7 +1175,10 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pOperator->dynamicTask) {
|
||||
|
@ -1305,7 +1311,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
// scan table one by one sequentially
|
||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||
int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
STableKeyInfo tInfo = {0};
|
||||
pInfo->countState = TABLE_COUNT_STATE_END;
|
||||
|
||||
|
@ -1320,7 +1326,13 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pInfo->currentTable++;
|
||||
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
numOfTables = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (pInfo->currentTable >= numOfTables) {
|
||||
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
||||
|
@ -3613,7 +3625,9 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
|||
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
|
||||
|
||||
// Transfer the Array of STableKeyInfo into uid list.
|
||||
size_t size = tableListGetSize(pTableListInfo);
|
||||
int32_t size = 0;
|
||||
code = tableListGetSize(pTableListInfo, &size);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
||||
|
@ -4629,7 +4643,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
SSDataBlock* pRes = pInfo->pRes;
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
int32_t size = tableListGetSize(pInfo->pTableListInfo);
|
||||
int32_t size = 0;
|
||||
code = tableListGetSize(pInfo->pTableListInfo, &size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (size == 0) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -4927,7 +4947,13 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
pInfo->bGroupProcessed = false;
|
||||
|
||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
int32_t code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
|
@ -5266,7 +5292,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t tableListSize = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
|
@ -5643,7 +5672,10 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
qDebug("%s table merge scan start group %" PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||
|
||||
{
|
||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t numOfTables = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
|
@ -5763,7 +5795,10 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t tableListSize = 0;
|
||||
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
|
|
|
@ -2843,7 +2843,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pTableListInfo = pTableListInfo;
|
||||
size_t num = tableListGetSize(pTableListInfo);
|
||||
int32_t num = 0;
|
||||
code = tableListGetSize(pTableListInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
|
||||
|
|
|
@ -324,8 +324,11 @@ _end:
|
|||
|
||||
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
|
||||
|
||||
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
|
||||
static int32_t copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||
QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
|
||||
pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
|
||||
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
|
@ -342,15 +345,24 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVa
|
|||
}
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
|
||||
saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
||||
|
@ -362,20 +374,18 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
||||
|
||||
#if 0
|
||||
ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
|
||||
#endif
|
||||
|
||||
while (pFillInfo->numOfCurrent < outputRows) {
|
||||
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
|
||||
|
||||
// set the next value for interpolation
|
||||
if (pFillInfo->currentKey < ts && ascFill) {
|
||||
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev;
|
||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||
code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else if (pFillInfo->currentKey > ts && !ascFill) {
|
||||
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next;
|
||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||
code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
||||
|
@ -392,21 +402,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
goto _end;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pFillInfo->currentKey == ts);
|
||||
QUERY_CHECK_CONDITION((pFillInfo->currentKey == ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
int32_t index = pBlock->info.rows;
|
||||
|
||||
int32_t nextRowIndex = pFillInfo->index + 1;
|
||||
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
|
||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
|
||||
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
// reset to null after last row
|
||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
|
||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -500,7 +513,9 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
|
|||
}
|
||||
}
|
||||
|
||||
static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
|
||||
static int32_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
/*
|
||||
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
|
||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||
|
@ -512,7 +527,14 @@ static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_
|
|||
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
|
||||
ASSERT(pFillInfo->numOfCurrent == resultCapacity);
|
||||
QUERY_CHECK_CONDITION((pFillInfo->numOfCurrent == resultCapacity), code, lino, _end,
|
||||
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||
|
@ -635,15 +657,6 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
|||
|
||||
// the endKey is now the aligned time window value. truncate time window isn't correct.
|
||||
pFillInfo->end = endKey;
|
||||
|
||||
#if 0
|
||||
if (pFillInfo->order == TSDB_ORDER_ASC) {
|
||||
ASSERT(pFillInfo->start <= pFillInfo->end);
|
||||
} else {
|
||||
ASSERT(pFillInfo->start >= pFillInfo->end);
|
||||
}
|
||||
#endif
|
||||
|
||||
pFillInfo->index = 0;
|
||||
pFillInfo->numOfRows = numOfRows;
|
||||
}
|
||||
|
@ -687,7 +700,6 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
|||
numOfRes =
|
||||
taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
|
||||
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
|
||||
ASSERT(numOfRes >= numOfRows);
|
||||
} else { // reach the end of data
|
||||
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||
|
@ -719,23 +731,30 @@ void taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* po
|
|||
|
||||
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||
|
||||
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
|
||||
ASSERT(numOfRes <= capacity);
|
||||
QUERY_CHECK_CONDITION((numOfRes <= capacity), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
// no data existed for fill operation now, append result according to the fill strategy
|
||||
if (remain == 0) {
|
||||
appendFilledResult(pFillInfo, p, numOfRes);
|
||||
code = appendFilledResult(pFillInfo, p, numOfRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
|
||||
ASSERT(numOfRes == pFillInfo->numOfCurrent);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
QUERY_CHECK_CONDITION((numOfRes == pFillInfo->numOfCurrent), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
||||
", current : % d, total : % d, %s",
|
||||
pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
|
||||
pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -318,8 +318,6 @@ void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode
|
|||
|
||||
// set_comm_input
|
||||
void fstStateSetCommInput(FstState* s, uint8_t inp) {
|
||||
// ASSERT(s->state == OneTransNext || s->state == OneTrans);
|
||||
|
||||
uint8_t val;
|
||||
COMMON_INDEX(inp, 0b111111, val);
|
||||
s->val = (s->val & fstStateDict[s->state].val) | val;
|
||||
|
@ -327,7 +325,6 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
|
|||
|
||||
// comm_input
|
||||
uint8_t fstStateCommInput(FstState* s, bool* null) {
|
||||
// ASSERT(s->state == OneTransNext || s->state == OneTrans);
|
||||
uint8_t v = s->val & 0b00111111;
|
||||
if (v == 0) {
|
||||
*null = true;
|
||||
|
@ -340,7 +337,6 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
|
|||
// input_len
|
||||
|
||||
uint64_t fstStateInputLen(FstState* s) {
|
||||
// ASSERT(s->state == OneTransNext || s->state == OneTrans);
|
||||
bool null = false;
|
||||
(void)fstStateCommInput(s, &null);
|
||||
return null ? 1 : 0;
|
||||
|
@ -348,11 +344,9 @@ uint64_t fstStateInputLen(FstState* s) {
|
|||
|
||||
// end_addr
|
||||
uint64_t fstStateEndAddrForOneTransNext(FstState* s, FstSlice* data) {
|
||||
// ASSERT(s->state == OneTransNext);
|
||||
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s);
|
||||
}
|
||||
uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes) {
|
||||
// ASSERT(s->state == OneTrans);
|
||||
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
|
||||
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||
}
|
||||
|
@ -366,7 +360,6 @@ uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice*
|
|||
}
|
||||
// input
|
||||
uint8_t fstStateInput(FstState* s, FstNode* node) {
|
||||
// ASSERT(s->state == OneTransNext || s->state == OneTrans);
|
||||
FstSlice* slice = &node->data;
|
||||
bool null = false;
|
||||
uint8_t inp = fstStateCommInput(s, &null);
|
||||
|
@ -374,7 +367,6 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
|
|||
return null == false ? inp : data[node->start - 1];
|
||||
}
|
||||
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
FstSlice* slice = &node->data;
|
||||
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size
|
||||
|
@ -386,7 +378,6 @@ uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
|||
|
||||
// trans_addr
|
||||
CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
|
||||
// ASSERT(s->state == OneTransNext || s->state == OneTrans);
|
||||
FstSlice* slice = &node->data;
|
||||
if (s->state == OneTransNext) {
|
||||
return (CompiledAddr)(node->end) - 1;
|
||||
|
@ -402,8 +393,6 @@ CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
|
|||
}
|
||||
}
|
||||
CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
|
||||
FstSlice* slice = &node->data;
|
||||
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) -
|
||||
|
@ -414,7 +403,6 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
|
|||
|
||||
// sizes
|
||||
PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
|
||||
/// ASSERT(s->state == OneTrans || s->state == AnyTrans);
|
||||
uint64_t i;
|
||||
if (s->state == OneTrans) {
|
||||
i = FST_SLICE_LEN(slice) - 1 - fstStateInputLen(s) - 1;
|
||||
|
@ -427,8 +415,6 @@ PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
|
|||
}
|
||||
// Output
|
||||
Output fstStateOutput(FstState* s, FstNode* node) {
|
||||
// ASSERT(s->state == OneTrans);
|
||||
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
|
||||
if (oSizes == 0) {
|
||||
return 0;
|
||||
|
@ -441,8 +427,6 @@ Output fstStateOutput(FstState* s, FstNode* node) {
|
|||
return unpackUint64(data + i, oSizes);
|
||||
}
|
||||
Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
|
||||
if (oSizes == 0) {
|
||||
return 0;
|
||||
|
@ -458,19 +442,14 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
|||
// anyTrans specify function
|
||||
|
||||
void fstStateSetFinalState(FstState* s, bool yes) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
if (yes) {
|
||||
s->val |= 0b01000000;
|
||||
}
|
||||
return;
|
||||
}
|
||||
bool fstStateIsFinalState(FstState* s) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
return (s->val & 0b01000000) == 0b01000000;
|
||||
}
|
||||
bool fstStateIsFinalState(FstState* s) { return (s->val & 0b01000000) == 0b01000000; }
|
||||
|
||||
void fstStateSetStateNtrans(FstState* s, uint8_t n) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
if (n <= 0b00111111) {
|
||||
s->val = (s->val & 0b11000000) | n;
|
||||
}
|
||||
|
@ -478,7 +457,6 @@ void fstStateSetStateNtrans(FstState* s, uint8_t n) {
|
|||
}
|
||||
// state_ntrans
|
||||
uint8_t fstStateStateNtrans(FstState* s, bool* null) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
*null = false;
|
||||
uint8_t n = s->val & 0b00111111;
|
||||
|
||||
|
@ -488,16 +466,13 @@ uint8_t fstStateStateNtrans(FstState* s, bool* null) {
|
|||
return n;
|
||||
}
|
||||
uint64_t fstStateTotalTransSize(FstState* s, uint64_t version, PackSizes sizes, uint64_t nTrans) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
uint64_t idxSize = fstStateTransIndexSize(s, version, nTrans);
|
||||
return nTrans + (nTrans * FST_GET_TRANSITION_PACK_SIZE(sizes)) + idxSize;
|
||||
}
|
||||
uint64_t fstStateTransIndexSize(FstState* s, uint64_t version, uint64_t nTrans) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
return (version >= 2 && nTrans > TRANS_INDEX_THRESHOLD) ? 256 : 0;
|
||||
}
|
||||
uint64_t fstStateNtransLen(FstState* s) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
bool null = false;
|
||||
(void)fstStateStateNtrans(s, &null);
|
||||
return null == true ? 1 : 0;
|
||||
|
@ -526,7 +501,6 @@ Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackS
|
|||
return unpackUint64(data + at, (uint8_t)oSizes);
|
||||
}
|
||||
uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
|
||||
// ASSERT(s->state == AnyTrans);
|
||||
FstSlice* slice = &node->data;
|
||||
if (node->version >= 2 && node->nTrans > TRANS_INDEX_THRESHOLD) {
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size
|
||||
|
@ -672,17 +646,14 @@ bool fstNodeGetTransitionAddrAt(FstNode* node, uint64_t i, CompiledAddr* res) {
|
|||
bool s = true;
|
||||
FstState* st = &node->state;
|
||||
if (st->state == OneTransNext) {
|
||||
/// ASSERT(i == 0);
|
||||
(void)fstStateTransAddr(st, node);
|
||||
} else if (st->state == OneTrans) {
|
||||
// ASSERT(i == 0);
|
||||
(void)fstStateTransAddr(st, node);
|
||||
} else if (st->state == AnyTrans) {
|
||||
(void)fstStateTransAddrForAnyTrans(st, node, i);
|
||||
} else if (FST_STATE_EMPTY_FINAL(node)) {
|
||||
s = false;
|
||||
} else {
|
||||
// ASSERT(0);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -718,7 +689,6 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
|
|||
|
||||
bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode* builderNode) {
|
||||
int32_t sz = taosArrayGetSize(builderNode->trans);
|
||||
// ASSERT(sz < 256);
|
||||
if (sz == 0 && builderNode->isFinal && builderNode->finalOutput == 0) {
|
||||
return true;
|
||||
} else if (sz != 1 || builderNode->isFinal) {
|
||||
|
@ -800,7 +770,6 @@ void fstBuilderInsertOutput(FstBuilder* b, FstSlice bs, Output in) {
|
|||
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
|
||||
|
||||
if (prefixLen == FST_SLICE_LEN(s)) {
|
||||
// ASSERT(out == 0);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -844,7 +813,6 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
|
|||
addr = fstBuilderCompile(b, bn);
|
||||
|
||||
fstBuilderNodeDestroy(bn);
|
||||
// ASSERT(addr != NONE_ADDRESS);
|
||||
}
|
||||
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
|
||||
return;
|
||||
|
|
|
@ -98,7 +98,6 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
|
|||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
|
||||
// startAddr) {
|
||||
// size_t sz = taosArrayGetSize(b->trans);
|
||||
// ASSERT(sz < 256);
|
||||
// if (FST_BUILDER_NODE_IS_FINAL(b)
|
||||
// && FST_BUILDER_NODE_TRANS_ISEMPTY(b)
|
||||
// && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) {
|
||||
|
|
|
@ -45,9 +45,6 @@
|
|||
//
|
||||
|
||||
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||
ASSERT(a >= 0);
|
||||
ASSERT(b >= 0);
|
||||
|
||||
int64_t c = a > b ? a - b : b - a;
|
||||
return c;
|
||||
}
|
||||
|
|
|
@ -3523,6 +3523,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
return 0;
|
||||
}
|
||||
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
|
||||
if (matchTerm < 0) {
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pMsg->currentTerm == matchTerm) {
|
||||
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
||||
}
|
||||
|
|
|
@ -46,8 +46,8 @@ int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
|||
|
||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
|
||||
int32_t code = 0;
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
SyncIndex index = pEntry->index;
|
||||
|
||||
if (index - pBuf->startIndex >= pBuf->size) {
|
||||
|
@ -102,13 +102,13 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
pBuf->endIndex = index + 1;
|
||||
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
taosMsleep(1);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
@ -134,7 +134,11 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
|||
|
||||
if (prevIndex >= pBuf->startIndex) {
|
||||
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERTS(pEntry != NULL, "no log entry found");
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
|
||||
*pSyncTerm = -1;
|
||||
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||
}
|
||||
prevLogTerm = pEntry->term;
|
||||
*pSyncTerm = prevLogTerm;
|
||||
return 0;
|
||||
|
@ -142,9 +146,18 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
|||
|
||||
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
|
||||
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
|
||||
ASSERTS(timeMs != 0, "no log entry found");
|
||||
if (timeMs == 0) {
|
||||
sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
|
||||
*pSyncTerm = -1;
|
||||
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||
}
|
||||
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
|
||||
ASSERT(prevIndex == 0 || prevLogTerm != 0);
|
||||
if (!(prevIndex == 0 || prevLogTerm != 0)) {
|
||||
sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
|
||||
prevLogTerm);
|
||||
*pSyncTerm = -1;
|
||||
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||
}
|
||||
*pSyncTerm = prevLogTerm;
|
||||
return 0;
|
||||
}
|
||||
|
@ -289,7 +302,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||
|
||||
// validate
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return 0;
|
||||
|
||||
_exit:
|
||||
|
@ -307,8 +320,8 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
}
|
||||
|
||||
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||
if (pEntry == NULL) continue;
|
||||
|
@ -321,15 +334,19 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
if (code < 0) {
|
||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
||||
}
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return code;
|
||||
}
|
||||
|
||||
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
|
||||
SyncIndex index = pBuf->matchIndex;
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERT(pEntry != NULL);
|
||||
if (pEntry == NULL) {
|
||||
sError("failed to get last match term since entry is null");
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
return pEntry->term;
|
||||
}
|
||||
|
||||
|
@ -348,8 +365,8 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
|
|||
}
|
||||
|
||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
int32_t code = 0;
|
||||
SyncIndex index = pEntry->index;
|
||||
SyncIndex prevIndex = pEntry->index - 1;
|
||||
|
@ -357,6 +374,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SSyncRaftEntry* pExist = NULL;
|
||||
bool inBuf = true;
|
||||
|
||||
if (lastMatchTerm < 0) {
|
||||
sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
if (index <= pBuf->commitIndex) {
|
||||
sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
|
||||
" %" PRId64 ", %" PRId64 ")",
|
||||
|
@ -364,7 +387,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
pBuf->endIndex);
|
||||
SyncTerm term = -1;
|
||||
code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
|
||||
ASSERT(pEntry->term >= 0);
|
||||
if (pEntry->term < 0) {
|
||||
sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
if (term == pEntry->term) {
|
||||
code = 0;
|
||||
}
|
||||
|
@ -401,7 +428,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
// check current in buffer
|
||||
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
|
||||
if (pExist != NULL) {
|
||||
ASSERT(pEntry->index == pExist->index);
|
||||
if (pEntry->index != pExist->index) {
|
||||
sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
|
||||
pExist->index);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
if (pEntry->term != pExist->term) {
|
||||
(void)syncLogBufferRollback(pBuf, pNode, index);
|
||||
} else {
|
||||
|
@ -411,7 +443,14 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
pBuf->endIndex);
|
||||
SyncTerm existPrevTerm = -1;
|
||||
(void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm);
|
||||
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
|
||||
if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
|
||||
sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
|
||||
", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
|
||||
", existPrevTerm:%" PRId64,
|
||||
pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
code = 0;
|
||||
goto _out;
|
||||
}
|
||||
|
@ -446,8 +485,8 @@ _out:
|
|||
syncEntryDestroy(pExist);
|
||||
pExist = NULL;
|
||||
}
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -479,8 +518,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
|
|||
}
|
||||
|
||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) {
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
|
||||
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||
int64_t matchIndex = pBuf->matchIndex;
|
||||
|
@ -488,7 +527,11 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
|
||||
while (pBuf->matchIndex + 1 < pBuf->endIndex) {
|
||||
int64_t index = pBuf->matchIndex + 1;
|
||||
ASSERT(index >= 0);
|
||||
if (index < 0) {
|
||||
sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
// try to proceed
|
||||
SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
|
||||
|
@ -501,14 +544,37 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
goto _out;
|
||||
}
|
||||
|
||||
ASSERT(index == pEntry->index);
|
||||
if (index != pEntry->index) {
|
||||
sError("vgId:%d, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, index, pEntry->index);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
// match
|
||||
SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERT(pMatch != NULL);
|
||||
ASSERT(pMatch->index == pBuf->matchIndex);
|
||||
ASSERT(pMatch->index + 1 == pEntry->index);
|
||||
ASSERT(prevLogIndex == pMatch->index);
|
||||
if (pMatch == NULL) {
|
||||
sError("vgId:%d, failed to proceed since pMatch is null", pNode->vgId);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
if (pMatch->index != pBuf->matchIndex) {
|
||||
sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
|
||||
pMatch->index, pBuf->matchIndex);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
if (pMatch->index + 1 != pEntry->index) {
|
||||
sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId,
|
||||
pMatch->index, pEntry->index);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
if (prevLogIndex != pMatch->index) {
|
||||
sError("vgId:%d, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, prevLogIndex,
|
||||
pMatch->index);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
if (pMatch->term != prevLogTerm) {
|
||||
sInfo(
|
||||
|
@ -567,7 +633,12 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
// replicate on demand
|
||||
(void)syncNodeReplicateWithoutLock(pNode);
|
||||
|
||||
ASSERT(pEntry->index == pBuf->matchIndex);
|
||||
if (pEntry->index != pBuf->matchIndex) {
|
||||
sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
|
||||
pEntry->index, pBuf->matchIndex);
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
// update my match index
|
||||
matchIndex = pBuf->matchIndex;
|
||||
|
@ -579,8 +650,8 @@ _out:
|
|||
if (pMatchTerm) {
|
||||
*pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
|
||||
}
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return matchIndex;
|
||||
}
|
||||
|
||||
|
@ -643,17 +714,36 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
||||
ASSERT(pBuf->startIndex <= pBuf->matchIndex);
|
||||
ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
|
||||
ASSERT(pBuf->matchIndex < pBuf->endIndex);
|
||||
ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
|
||||
ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
|
||||
if (pBuf->startIndex > pBuf->matchIndex) {
|
||||
sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
|
||||
pBuf->matchIndex);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pBuf->commitIndex > pBuf->matchIndex) {
|
||||
sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
|
||||
pBuf->matchIndex);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pBuf->matchIndex >= pBuf->endIndex) {
|
||||
sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
|
||||
pBuf->endIndex);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
|
||||
sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
|
||||
pBuf->endIndex, pBuf->startIndex, pBuf->size);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
|
||||
sError("failed to validate since pItem is null");
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
|
||||
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||
SSyncFSM* pFsm = pNode->pFsm;
|
||||
|
@ -778,15 +868,18 @@ _out:
|
|||
syncEntryDestroy(pNextEntry);
|
||||
pNextEntry = NULL;
|
||||
}
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
|
||||
if (pMgr == NULL) return;
|
||||
|
||||
ASSERT(pMgr->startIndex >= 0);
|
||||
if (pMgr->startIndex < 0) {
|
||||
sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
|
||||
return;
|
||||
}
|
||||
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
|
||||
(void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
|
||||
}
|
||||
|
@ -1285,13 +1378,13 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
|
|||
}
|
||||
|
||||
if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
SyncIndex index = pBuf->endIndex - 1;
|
||||
|
@ -1308,8 +1401,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||
syncLogReplReset(pMgr);
|
||||
}
|
||||
(void)syncLogBufferValidate(pBuf);
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ _error:
|
|||
|
||||
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
|
||||
if (tBloomFilterIsFull(pBF)) {
|
||||
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
bool hasChange = false;
|
||||
|
|
|
@ -118,7 +118,7 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
|
|||
}
|
||||
|
||||
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||
ASSERT(pNormalBf);
|
||||
QUERY_CHECK_NULL(pNormalBf, code, lino, _end, terrno);
|
||||
if (tBloomFilterIsFull(pNormalBf)) {
|
||||
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.eos import *
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
"""Add test case to verify TD-30816 (last/last_row accuracy)
|
||||
"""
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def prepare_data(self):
|
||||
tdSql.execute("create database db_td30816 cachemodel 'both';")
|
||||
tdSql.execute("use db_td30816;")
|
||||
# create regular table
|
||||
tdSql.execute("create table rt_int (ts timestamp, c1 int primary key, c2 int);")
|
||||
tdSql.execute("create table rt_str (ts timestamp, c1 varchar(16) primary key, c2 varchar(16));")
|
||||
|
||||
# create stable
|
||||
tdSql.execute("create table st_pk_int (ts timestamp, c1 int primary key, c2 int) tags (t1 int);")
|
||||
tdSql.execute("create table st_pk_str (ts timestamp, c1 varchar(16) primary key, c2 varchar(16)) tags (t1 int);")
|
||||
|
||||
# create child table
|
||||
tdSql.execute("create table ct1 using st_pk_int tags(1);")
|
||||
tdSql.execute("create table ct2 using st_pk_int tags(2);")
|
||||
|
||||
tdSql.execute("create table ct3 using st_pk_str tags(3);")
|
||||
tdSql.execute("create table ct4 using st_pk_str tags(4);")
|
||||
|
||||
# insert data to regular table
|
||||
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:00', 1, NULL);")
|
||||
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:01', 2, 1);")
|
||||
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:00', 'a', NULL);")
|
||||
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:01', 'b', '1');")
|
||||
|
||||
# insert data to child table
|
||||
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:00', 1, 1);")
|
||||
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:01', 2, NULL);")
|
||||
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:00', 3, 3);")
|
||||
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:01', 4, NULL);")
|
||||
|
||||
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:00', 'a', '1');")
|
||||
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:01', 'b', NULL);")
|
||||
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:00', 'c', '3');")
|
||||
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:01', 'd', NULL);")
|
||||
|
||||
def test_last_with_primarykey_int_ct(self):
|
||||
tdSql.execute("use db_td30816;")
|
||||
tdSql.query("select last(*) from st_pk_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 4)
|
||||
tdSql.checkData(0, 2, 3)
|
||||
|
||||
tdSql.query("select last_row(*) from st_pk_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 4)
|
||||
tdSql.checkData(0, 2, None)
|
||||
|
||||
# delete and insert data
|
||||
tdSql.execute("delete from ct1 where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("delete from ct2 where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:00', 0, 5);")
|
||||
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:00', -1, 6);")
|
||||
tdSql.query("select last(*) from st_pk_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 3)
|
||||
tdSql.checkData(0, 2, 3)
|
||||
|
||||
tdSql.query("select last_row(*) from st_pk_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 3)
|
||||
tdSql.checkData(0, 2, 3)
|
||||
tdLog.info("Finish test_last_with_primarykey_int_ct")
|
||||
|
||||
def test_last_with_primarykey_str_ct(self):
|
||||
tdSql.execute("use db_td30816;")
|
||||
tdSql.query("select last(*) from st_pk_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 'd')
|
||||
tdSql.checkData(0, 2, '3')
|
||||
|
||||
tdSql.query("select last_row(*) from st_pk_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 'd')
|
||||
tdSql.checkData(0, 2, None)
|
||||
|
||||
# delete and insert data
|
||||
tdSql.execute("delete from ct3 where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("delete from ct4 where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:00', '6', '5');")
|
||||
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:00', '7', '6');")
|
||||
|
||||
tdSql.query("select last(*) from st_pk_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 'c')
|
||||
tdSql.checkData(0, 2, '3')
|
||||
|
||||
tdSql.query("select last_row(*) from st_pk_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 'c')
|
||||
tdSql.checkData(0, 2, 3)
|
||||
tdLog.info("Finish test_last_with_primarykey_str_ct")
|
||||
|
||||
def test_last_with_primarykey_int_rt(self):
|
||||
tdSql.execute("use db_td30816;")
|
||||
tdSql.query("select last(*) from rt_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 2)
|
||||
tdSql.checkData(0, 2, 1)
|
||||
|
||||
tdSql.query("select last_row(*) from rt_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 2)
|
||||
tdSql.checkData(0, 2, 1)
|
||||
|
||||
tdSql.execute("delete from rt_int where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:00', 0, 5);")
|
||||
|
||||
tdSql.query("select last(*) from rt_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(0, 2, 5)
|
||||
|
||||
tdSql.query("select last_row(*) from rt_int;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(0, 2, None)
|
||||
tdLog.info("Finish test_last_with_primarykey_int_rt")
|
||||
|
||||
def test_last_with_primarykey_str_rt(self):
|
||||
tdSql.execute("use db_td30816;")
|
||||
tdSql.query("select last(*) from rt_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 'b')
|
||||
tdSql.checkData(0, 2, '1')
|
||||
|
||||
tdSql.query("select last_row(*) from rt_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||
tdSql.checkData(0, 1, 'b')
|
||||
tdSql.checkData(0, 2, '1')
|
||||
|
||||
tdSql.execute("delete from rt_str where ts='2021-01-01 00:00:01';")
|
||||
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:00', '2', '5');")
|
||||
|
||||
tdSql.query("select last(*) from rt_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 'a')
|
||||
tdSql.checkData(0, 2, '5')
|
||||
|
||||
tdSql.query("select last_row(*) from rt_str;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||
tdSql.checkData(0, 1, 'a')
|
||||
tdSql.checkData(0, 2, None)
|
||||
tdLog.info("Finish test_last_with_primarykey_str_rt")
|
||||
|
||||
def run(self):
|
||||
self.prepare_data()
|
||||
# regular table
|
||||
self.test_last_with_primarykey_int_rt()
|
||||
self.test_last_with_primarykey_str_rt()
|
||||
# child tables
|
||||
self.test_last_with_primarykey_int_ct()
|
||||
self.test_last_with_primarykey_str_ct()
|
||||
|
||||
def stop(self):
|
||||
tdSql.execute("drop database db_td30816;")
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -38,6 +38,7 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f query/queryBugs.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f tmq/tmqBugs.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py
|
||||
|
||||
#
|
||||
# system test
|
||||
|
@ -310,7 +311,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellError.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellNetChk.py
|
||||
,,n,system-test,python3 ./test.py -f 0-others/taosShellNetChk.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
|
||||
|
|
|
@ -49,6 +49,7 @@ fi
|
|||
|
||||
indirect_leak=$(cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l)
|
||||
python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
|
||||
python_taos_error=$(cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal" | wc -l)
|
||||
|
||||
# ignore
|
||||
|
||||
|
@ -84,16 +85,17 @@ echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
|||
echo -e "\033[44;32;1m"asan indirect_leak: $indirect_leak"\033[0m"
|
||||
echo -e "\033[44;32;1m"asan runtime error: $runtime_error"\033[0m"
|
||||
echo -e "\033[44;32;1m"asan python error: $python_error"\033[0m"
|
||||
echo -e "\033[44;32;1m"asan python taos error: $python_taos_error"\033[0m"
|
||||
|
||||
let "errors=$error_num+$memory_leak+$indirect_leak+$runtime_error+$python_error"
|
||||
let "errors=$error_num+$memory_leak+$indirect_leak+$runtime_error+$python_error+$python_taos_error"
|
||||
|
||||
if [ $errors -eq 0 ]; then
|
||||
echo -e "\033[44;32;1m"no asan errors"\033[0m"
|
||||
exit 0
|
||||
else
|
||||
echo -e "\033[44;31;1m"asan total errors: $errors"\033[0m"
|
||||
if [ $python_error -ne 0 ]; then
|
||||
cat ${LOG_DIR}/*.info
|
||||
if [ $python_error -ne 0 ] || [ $python_taos_error -ne 0 ] ; then
|
||||
cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal"
|
||||
fi
|
||||
cat ${LOG_DIR}/*.asan
|
||||
exit 1
|
||||
|
|
Loading…
Reference in New Issue