diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e2ecdca59f..338a0b40bd 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, const char* id); + SSubmitTbData* pTableData, int64_t earlyTs, const char* id); int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b369bd6039..8222af4d60 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,6 +242,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmit int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); +int64_t tsdbGetEarliestTs(STsdb* pTsdb); // tq STQ* tqOpen(const char* path, SVnode* pVnode); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index fb898c02f8..d0913081ac 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -203,7 +203,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, ""); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -213,7 +213,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, ""); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5929c6b591..3fe124ea85 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -34,7 +34,7 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, - const char* id); + int64_t earlyTs, const char* id); static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid); static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, @@ -552,7 +552,8 @@ int32_t tsAscendingSortFn(const void* p1, const void* p2) { } } -int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { +int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs, + const char* id) { int32_t numOfRows = pDataBlock->info.rows; int32_t code = TSDB_CODE_SUCCESS; @@ -581,6 +582,14 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); ts = *(int64_t*)colDataGetData(pColData, j); tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts); + + if (ts < earlyTs) { + tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id, + ts, earlyTs); + pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); + taosArrayDestroy(pVals); + return TSDB_CODE_SUCCESS; + } } if (IS_SET_NULL(pCol)) { @@ -605,8 +614,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat dataIndex++; } else { void* colData = colDataGetData(pColData, j); - if (IS_VAR_DATA_TYPE(pCol->type)) { - // address copy, no value + if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value SValue sv = (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, sv); @@ -796,22 +804,25 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, const char* id) { + SSubmitTbData* pTableData, int64_t earlyTs, const char* id) { int32_t numOfRows = pDataBlock->info.rows; + char* dstTableName = pDataBlock->info.parTbName; tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id, blockIndex + 1, numOfRows, suid); - char* dstTableName = pDataBlock->info.parTbName; // convert all rows - int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, id); + int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id); if (code != TSDB_CODE_SUCCESS) { tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno)); return code; } - taosArraySort(pTableData->aRowP, tsAscendingSortFn); - tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); + if (pTableData->aRowP != NULL) { + taosArraySort(pTableData->aRowP, tsAscendingSortFn); + tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); + } + return code; } @@ -836,6 +847,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; + int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb); bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData) { @@ -870,8 +882,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); - if (code != TSDB_CODE_SUCCESS) { + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { continue; } @@ -918,8 +930,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); - if (code != TSDB_CODE_SUCCESS) { + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + tbData.pCreateTbReq = NULL; + } continue; } @@ -928,8 +944,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); - if (code != TSDB_CODE_SUCCESS) { + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + tbData.pCreateTbReq = NULL; + } continue; } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index ea3d285880..f9fede1d9b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -30,6 +30,14 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { return 0; } +int64_t tsdbGetEarliestTs(STsdb *pTsdb) { + STsdbKeepCfg *pCfg = &pTsdb->keepCfg; + + int64_t now = taosGetTimestamp(pCfg->precision); + int64_t ts = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick + return ts; +} + /** * @brief * diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 99520f7c92..d7c3eff571 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -226,14 +226,10 @@ static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { - STsdbKeepCfg* pCfg = &pTsdb->keepCfg; - - int64_t now = taosGetTimestamp(pCfg->precision); - int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick - + int64_t earlyTs = tsdbGetEarliestTs(pTsdb); STimeWindow win = *pWindow; - if (win.skey < earilyTs) { - win.skey = earilyTs; + if (win.skey < earlyTs) { + win.skey = earlyTs; } return win;