fix(tsdb): set correct initial value for compare
This commit is contained in:
parent
110ac5bd16
commit
55bb6ab341
|
@ -126,6 +126,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
|
||||||
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
|
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
|
||||||
void tRowGetKey(SRow *pRow, SRowKey *key);
|
void tRowGetKey(SRow *pRow, SRowKey *key);
|
||||||
int32_t tRowKeyCompare(const void *p1, const void *p2);
|
int32_t tRowKeyCompare(const void *p1, const void *p2);
|
||||||
|
int32_t tRowKeyAssign(SRowKey* pDst, SRowKey* pSrc);
|
||||||
|
|
||||||
// SRowIter ================================
|
// SRowIter ================================
|
||||||
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
|
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
|
||||||
|
|
|
@ -1286,6 +1286,27 @@ int32_t tRowKeyCompare(const void *p1, const void *p2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
|
||||||
|
pDst->ts = pSrc->ts;
|
||||||
|
pDst->numOfPKs = pSrc->numOfPKs;
|
||||||
|
|
||||||
|
if (pSrc->numOfPKs > 0) {
|
||||||
|
for (int32_t i = 0; i < pSrc->numOfPKs; ++i) {
|
||||||
|
SValue *pVal = &pDst->pks[i];
|
||||||
|
pVal->type = pSrc->pks[i].type;
|
||||||
|
|
||||||
|
if (IS_NUMERIC_TYPE(pVal->type)) {
|
||||||
|
pVal->val = pSrc->pks[i].val;
|
||||||
|
} else {
|
||||||
|
memcpy(pVal->pData, pVal->pData, pVal->nData);
|
||||||
|
pVal->nData = pSrc->pks[i].nData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// STag ========================================
|
// STag ========================================
|
||||||
static int tTagValCmprFn(const void *p1, const void *p2) {
|
static int tTagValCmprFn(const void *p1, const void *p2) {
|
||||||
if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
|
if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
|
||||||
|
|
|
@ -1183,7 +1183,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// ASSERT(status == TASK_STATUS__HALT);
|
|
||||||
if (status != TASK_STATUS__HALT) {
|
if (status != TASK_STATUS__HALT) {
|
||||||
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
|
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
|
||||||
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||||
|
|
|
@ -130,14 +130,29 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c
|
||||||
return *p;
|
return *p;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len) {
|
static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
|
||||||
pKey->numOfPKs = numOfPks;
|
pKey->numOfPKs = numOfPks;
|
||||||
pKey->ts = ts;
|
pKey->ts = ts;
|
||||||
|
|
||||||
if (numOfPks > 0) {
|
if (numOfPks > 0) {
|
||||||
pKey->pks[0].type = type;
|
pKey->pks[0].type = type;
|
||||||
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
|
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
|
||||||
pKey->pks[0].val = INT64_MIN;
|
char* p = (char*)&pKey->pks[0].val;
|
||||||
|
if (asc) {
|
||||||
|
switch(pKey->pks[0].type) {
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:*(int64_t*)p = INT64_MIN;break;
|
||||||
|
case TSDB_DATA_TYPE_INT:*(int32_t*)p = INT32_MIN;break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:*(int16_t*)p = INT16_MIN;break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:*(int8_t*)p = INT8_MIN;break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch(pKey->pks[0].type) {
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:*(int64_t*)p = INT64_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_INT:*(int32_t*)p = INT32_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:*(int16_t*)p = INT16_MAX;break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:*(int8_t*)p = INT8_MAX;break;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pKey->pks[0].pData = taosMemoryCalloc(1, len);
|
pKey->pks[0].pData = taosMemoryCalloc(1, len);
|
||||||
pKey->pks[0].nData = 0;
|
pKey->pks[0].nData = 0;
|
||||||
|
@ -154,22 +169,23 @@ static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t
|
||||||
|
|
||||||
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
|
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
|
||||||
int32_t numOfPks = pReader->suppInfo.numOfPks;
|
int32_t numOfPks = pReader->suppInfo.numOfPks;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (asc) {
|
||||||
int64_t skey = pReader->info.window.skey;
|
int64_t skey = pReader->info.window.skey;
|
||||||
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||||
|
|
||||||
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
|
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc);
|
||||||
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type,
|
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type,
|
||||||
pReader->suppInfo.pk.bytes);
|
pReader->suppInfo.pk.bytes, asc);
|
||||||
} else {
|
} else {
|
||||||
int64_t ekey = pReader->info.window.ekey;
|
int64_t ekey = pReader->info.window.ekey;
|
||||||
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||||
|
|
||||||
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
|
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc);
|
||||||
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type,
|
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type,
|
||||||
pReader->suppInfo.pk.bytes);
|
pReader->suppInfo.pk.bytes, asc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -628,27 +628,6 @@ void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
|
|
||||||
pDst->ts = pSrc->ts;
|
|
||||||
pDst->numOfPKs = pSrc->numOfPKs;
|
|
||||||
|
|
||||||
if (pSrc->numOfPKs > 0) {
|
|
||||||
for (int32_t i = 0; i < pSrc->numOfPKs; ++i) {
|
|
||||||
SValue *pVal = &pDst->pks[i];
|
|
||||||
pVal->type = pSrc->pks[i].type;
|
|
||||||
|
|
||||||
if (IS_NUMERIC_TYPE(pVal->type)) {
|
|
||||||
pVal->val = pSrc->pks[i].val;
|
|
||||||
} else {
|
|
||||||
memcpy(pVal->pData, pVal->pData, pVal->nData);
|
|
||||||
pVal->nData = pSrc->pks[i].nData;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
|
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
|
||||||
int32_t c = tRowKeyCompare(&key1->key, &key2->key);
|
int32_t c = tRowKeyCompare(&key1->key, &key2->key);
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo {
|
||||||
SColumn tsCol; // primary timestamp column
|
SColumn tsCol; // primary timestamp column
|
||||||
SExprSupp scalarSup; // scalar calculation
|
SExprSupp scalarSup; // scalar calculation
|
||||||
struct SFillColInfo* pFillColInfo; // fill column info
|
struct SFillColInfo* pFillColInfo; // fill column info
|
||||||
int64_t prevTs;
|
SRowKey prevKey;
|
||||||
bool prevTsSet;
|
bool prevTsSet;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SGroupKeys* pPrevGroupKey;
|
SGroupKeys* pPrevGroupKey;
|
||||||
|
@ -178,22 +178,49 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
|
||||||
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
|
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) {
|
||||||
|
pKey->ts = ts;
|
||||||
|
pKey->numOfPKs = 1;
|
||||||
|
|
||||||
|
int8_t t = pPkCol->info.type;
|
||||||
|
|
||||||
|
pKey->pks[0].type = t;
|
||||||
|
if (IS_NUMERIC_TYPE(t)) {
|
||||||
|
GET_TYPED_DATA(pKey->pks[0].val, int64_t, t, colDataGetNumData(pPkCol, rowIndex));
|
||||||
|
} else {
|
||||||
|
char* p = colDataGetVarData(pPkCol, rowIndex);
|
||||||
|
pKey->pks[0].pData = (uint8_t*)varDataVal(p);
|
||||||
|
pKey->pks[0].nData = varDataLen(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
|
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
|
||||||
int32_t curIndex, int32_t rows) {
|
SColumnInfoData* pPkCol, int32_t curIndex, int32_t rows) {
|
||||||
|
|
||||||
|
|
||||||
int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
|
int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
|
||||||
if (currentTs > pSliceInfo->win.ekey) {
|
if (currentTs > pSliceInfo->win.ekey) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
|
SRowKey cur = {.ts = currentTs, .numOfPKs = (pPkCol != NULL)? 1:0};
|
||||||
return true;
|
if (pPkCol != NULL) {
|
||||||
|
cur.pks[0].type = pPkCol->info.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) {
|
||||||
|
// if (pPkCol == NULL) {
|
||||||
|
return true;
|
||||||
|
/* } else {
|
||||||
|
tRowGetKeyFromColData(currentTs, pPkCol, curIndex, &cur);
|
||||||
|
if (tRowKeyCompare(&cur, &pSliceInfo->prevKey) == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}*/
|
||||||
}
|
}
|
||||||
|
|
||||||
pSliceInfo->prevTsSet = true;
|
pSliceInfo->prevTsSet = true;
|
||||||
pSliceInfo->prevTs = currentTs;
|
tRowKeyAssign(&pSliceInfo->prevKey, &cur);
|
||||||
|
|
||||||
|
// todo handle next
|
||||||
if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
|
if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
|
||||||
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
|
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
|
||||||
if (currentTs == nextTs) {
|
if (currentTs == nextTs) {
|
||||||
|
@ -695,14 +722,20 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||||
|
SColumnInfoData* pPkCol = NULL;
|
||||||
|
|
||||||
|
if (pSliceInfo->hasPk) {
|
||||||
|
pPkCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->pkCol.slotId);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
|
int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
|
||||||
for (; i < pBlock->info.rows; ++i) {
|
for (; i < pBlock->info.rows; ++i) {
|
||||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
||||||
|
|
||||||
// check for duplicate timestamps
|
// check for duplicate timestamps
|
||||||
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
|
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) {
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
|
continue;
|
||||||
|
// T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
|
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
|
||||||
|
@ -875,11 +908,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
blockDataCleanup(pResBlock);
|
blockDataCleanup(pResBlock);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1017,13 +1048,23 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->interval.interval = pInterpPhyNode->interval;
|
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||||
pInfo->current = pInfo->win.skey;
|
pInfo->current = pInfo->win.skey;
|
||||||
pInfo->prevTsSet = false;
|
pInfo->prevTsSet = false;
|
||||||
pInfo->prevTs = 0;
|
pInfo->prevKey.ts = INT64_MIN;
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
pInfo->pPrevGroupKey = NULL;
|
pInfo->pPrevGroupKey = NULL;
|
||||||
pInfo->pNextGroupRes = NULL;
|
pInfo->pNextGroupRes = NULL;
|
||||||
pInfo->pRemainRes = NULL;
|
pInfo->pRemainRes = NULL;
|
||||||
pInfo->remainIndex = 0;
|
pInfo->remainIndex = 0;
|
||||||
|
|
||||||
|
if (pInfo->hasPk) {
|
||||||
|
pInfo->prevKey.numOfPKs = 1;
|
||||||
|
pInfo->prevKey.ts = INT64_MIN;
|
||||||
|
pInfo->prevKey.pks[0].type = pInfo->pkCol.type;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
|
||||||
|
pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||||
pScanInfo->base.cond.twindows = pInfo->win;
|
pScanInfo->base.cond.twindows = pInfo->win;
|
||||||
|
|
|
@ -658,11 +658,12 @@ static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow*
|
||||||
*/
|
*/
|
||||||
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) {
|
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) {
|
||||||
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|
||||||
|| pOperatorInfo->binfo.inputTsOrder !=
|
|| pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
|
||||||
pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter
|
// if input/output ts order mismatch, no filter
|
||||||
) {
|
) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperatorInfo->limit == 0) return true;
|
if (pOperatorInfo->limit == 0) return true;
|
||||||
|
|
||||||
if (pOperatorInfo->pBQ == NULL) {
|
if (pOperatorInfo->pBQ == NULL) {
|
||||||
|
|
|
@ -17,11 +17,6 @@ using namespace std;
|
||||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
|
||||||
testing::InitGoogleTest(&argc, argv);
|
|
||||||
return RUN_ALL_TESTS();
|
|
||||||
}
|
|
||||||
|
|
||||||
static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) {
|
static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) {
|
||||||
int64_t start = taosGetTimestampUs();
|
int64_t start = taosGetTimestampUs();
|
||||||
char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);
|
char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);
|
||||||
|
|
Loading…
Reference in New Issue