fix(stream): discard the result data block if the primary timestamp of results is expired.

This commit is contained in:
Haojun Liao 2024-05-30 14:12:23 +08:00
parent 0946f25e5f
commit cff5e753ef
6 changed files with 50 additions and 25 deletions

View File

@ -160,7 +160,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp,
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData, const char* id);
SSubmitTbData* pTableData, int64_t earlyTs, const char* id);
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,

View File

@ -242,6 +242,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmit
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int64_t tsdbGetEarliestTs(STsdb* pTsdb);
// tq
STQ* tqOpen(const char* path, SVnode* pVnode);

View File

@ -203,7 +203,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
if (index == NULL) { // no data yet, append it
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
if (code != TSDB_CODE_SUCCESS) {
continue;
}
@ -213,7 +213,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
} else {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, "");
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
if (code != TSDB_CODE_SUCCESS) {
continue;
}

View File

@ -34,7 +34,7 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
const char* id);
int64_t earlyTs, const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
@ -552,7 +552,8 @@ int32_t tsAscendingSortFn(const void* p1, const void* p2) {
}
}
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) {
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS;
@ -581,6 +582,14 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
ts = *(int64_t*)colDataGetData(pColData, j);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
if (ts < earlyTs) {
tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
ts, earlyTs);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals);
return TSDB_CODE_SUCCESS;
}
}
if (IS_SET_NULL(pCol)) {
@ -605,8 +614,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
dataIndex++;
} else {
void* colData = colDataGetData(pColData, j);
if (IS_VAR_DATA_TYPE(pCol->type)) {
// address copy, no value
if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value
SValue sv =
(SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
@ -796,22 +804,25 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
}
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData, const char* id) {
SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
char* dstTableName = pDataBlock->info.parTbName;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
blockIndex + 1, numOfRows, suid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, id);
int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
return code;
}
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
if (pTableData->aRowP != NULL) {
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
}
return code;
}
@ -836,6 +847,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
int32_t code = TSDB_CODE_SUCCESS;
const char* id = pTask->id.idStr;
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData) {
@ -870,8 +882,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
}
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
continue;
}
@ -918,8 +930,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
}
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
tbData.pCreateTbReq = NULL;
}
continue;
}
@ -928,8 +944,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
} else {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id);
if (code != TSDB_CODE_SUCCESS) {
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
if (tbData.pCreateTbReq != NULL) {
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
tbData.pCreateTbReq = NULL;
}
continue;
}

View File

@ -30,6 +30,14 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
return 0;
}
int64_t tsdbGetEarliestTs(STsdb *pTsdb) {
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
int64_t now = taosGetTimestamp(pCfg->precision);
int64_t ts = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
return ts;
}
/**
* @brief
*

View File

@ -226,14 +226,10 @@ static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
int64_t now = taosGetTimestamp(pCfg->precision);
int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
int64_t earlyTs = tsdbGetEarliestTs(pTsdb);
STimeWindow win = *pWindow;
if (win.skey < earilyTs) {
win.skey = earilyTs;
if (win.skey < earlyTs) {
win.skey = earlyTs;
}
return win;