fix(stream): fix stream processing for virtual tables

- Fixed status check in virtual table stream processing
- Fixed multi-way merge logic for super virtual tables
- Fixed generation of target subtable names during merge
This commit is contained in:
Jinqing Kuang 2025-03-19 23:15:16 +08:00
parent 0a96294056
commit c82398e175
18 changed files with 567 additions and 617 deletions

View File

@ -102,6 +102,8 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper, int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule, STaskNotifyEventStat* pNotifyEventStat); const char* stbFullName, bool newSubTableRule, STaskNotifyEventStat* pNotifyEventStat);
void qSetStreamMergeInfo(qTaskInfo_t tinfo, SArray* pVTables);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
* @param tinfo * @param tinfo

View File

@ -1073,12 +1073,6 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t); size_t metaSize = pBlock->info.rows * sizeof(int32_t);
char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small
if (tmp == NULL) {
return terrno;
}
pCol->varmeta.offset = (int32_t*)tmp;
memcpy(pCol->varmeta.offset, pStart, metaSize); memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize; pStart += metaSize;
} else { } else {

View File

@ -375,6 +375,7 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskT
uint64_t uid = 0; uint64_t uid = 0;
SArray** pTaskList = NULL; SArray** pTaskList = NULL;
if (pSourceTaskList) { if (pSourceTaskList) {
uid = pStream->uid;
pTaskList = &pSourceTaskList; pTaskList = &pSourceTaskList;
} else { } else {
streamGetUidTaskList(pStream, type, &uid, &pTaskList); streamGetUidTaskList(pStream, type, &uid, &pTaskList);
@ -454,6 +455,7 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks
TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno); TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno);
pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP; pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher; STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher;
pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed)); pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed));
TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno); TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno);
@ -462,26 +464,32 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks
int32_t iter = 0, vgId = 0; int32_t iter = 0, vgId = 0;
uint64_t uid = 0; uint64_t uid = 0;
STaskDispatcherFixed* pAddr = NULL;
void* p = NULL; void* p = NULL;
while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) { while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) {
char* vgUid = tSimpleHashGetKey(p, NULL); char* vgUid = tSimpleHashGetKey(p, NULL);
vgId = *(int32_t*)vgUid; vgId = *(int32_t*)vgUid;
uid = *(uint64_t*)((int32_t*)vgUid + 1); uid = *(uint64_t*)((int32_t*)vgUid + 1);
pAddr = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId)); void *px = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId));
if (NULL == pAddr) { if (NULL == px) {
mError("tSimpleHashGet vgId %d not found", vgId); mError("tSimpleHashGet vgId %d not found", vgId);
return code; return code;
} }
SStreamTask* pMergeTask = *(SStreamTask**)px;
if (pMergeTask == NULL) {
mError("tSimpleHashGet pMergeTask %d not found", vgId);
return code;
}
void* px = tSimpleHashGet(pTaskMap, &pAddr->taskId, sizeof(int32_t)); px = tSimpleHashGet(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId));
int32_t idx = 0; int32_t idx = 0;
if (px == NULL) { if (px == NULL) {
px = taosArrayPush(pDispatcher->taskInfos, pAddr); STaskDispatcherFixed addr = {
.taskId = pMergeTask->id.taskId, .nodeId = pMergeTask->info.nodeId, .epSet = pMergeTask->info.epSet};
px = taosArrayPush(pDispatcher->taskInfos, &addr);
TSDB_CHECK_NULL(px, code, lino, _end, terrno); TSDB_CHECK_NULL(px, code, lino, _end, terrno);
idx = taosArrayGetSize(pDispatcher->taskInfos) - 1; idx = taosArrayGetSize(pDispatcher->taskInfos) - 1;
code = tSimpleHashPut(pTaskMap, &pAddr->taskId, sizeof(int32_t), &idx, sizeof(int32_t)); code = tSimpleHashPut(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId), &idx, sizeof(idx));
if (code) { if (code) {
mError("tSimpleHashPut uid to task idx failed, error:%d", code); mError("tSimpleHashPut uid to task idx failed, error:%d", code);
return code; return code;
@ -495,9 +503,15 @@ static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks
mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code); mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code);
return code; return code;
} }
mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", code = streamTaskSetUpstreamInfo(pMergeTask, pTask);
pTask->id.idStr, pTask->info.nodeId, uid, pAddr->taskId, pAddr->nodeId); if (code != TSDB_CODE_SUCCESS) {
mError("failed to set upstream info of merge task, error:%d", code);
return code;
}
mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", pTask->id.idStr,
pTask->info.nodeId, uid, pMergeTask->id.taskId, pMergeTask->info.nodeId);
} }
_end: _end:
@ -662,7 +676,6 @@ static int32_t addVTableMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pS
} }
static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) { static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) {
STaskDispatcherFixed addr;
int32_t code = 0; int32_t code = 0;
int32_t taskNum = taosArrayGetSize(pMergeTaskList); int32_t taskNum = taosArrayGetSize(pMergeTaskList);
@ -676,11 +689,7 @@ static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks)
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i); SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i);
addr.taskId = pTask->id.taskId; code = tSimpleHashPut(*ppVgTasks, &pTask->info.nodeId, sizeof(pTask->info.nodeId), &pTask, POINTER_BYTES);
addr.nodeId = pTask->info.nodeId;
addr.epSet = pTask->info.epSet;
code = tSimpleHashPut(*ppVgTasks, &addr.nodeId, sizeof(addr.nodeId), &addr, sizeof(addr));
if (code) { if (code) {
mError("tSimpleHashPut %d STaskDispatcherFixed failed", i); mError("tSimpleHashPut %d STaskDispatcherFixed failed", i);
return code; return code;
@ -725,10 +734,9 @@ static int32_t addVTableSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* p
} }
plan->pVTables = *(SSHashObj**)p; plan->pVTables = *(SSHashObj**)p;
*(SSHashObj**)p = NULL;
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam, code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam,
hasAggTasks, pVgTasks, pSourceTaskList); hasAggTasks, pVgTasks, pSourceTaskList);
plan->pVTables = NULL;
if (code != 0) { if (code != 0) {
mError("failed to create stream task, code:%s", tstrerror(code)); mError("failed to create stream task, code:%s", tstrerror(code));
@ -1220,7 +1228,7 @@ static int32_t addVgroupToRes(char* fDBName, int32_t vvgId, uint64_t vuid, SRefC
char dbVgId[TSDB_DB_NAME_LEN + 32]; char dbVgId[TSDB_DB_NAME_LEN + 32];
SSHashObj *pTarVg = NULL, *pNewVg = NULL; SSHashObj *pTarVg = NULL, *pNewVg = NULL;
TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refColName), lino, _return); TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refTableName), lino, _return);
snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId); snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId);

View File

@ -1307,6 +1307,8 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
STR_WITH_SIZE_TO_VARSTR(level, "agg", 3); STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
STR_WITH_SIZE_TO_VARSTR(level, "sink", 4); STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
} else if (pTask->info.taskLevel == TASK_LEVEL__MERGE) {
STR_WITH_SIZE_TO_VARSTR(level, "merge", 5);
} }
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -1200,7 +1200,7 @@ int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const
} }
tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList)); tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
return tqCollectPhysicalTables(pReader, id); return TSDB_CODE_SUCCESS;
} }
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) { void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
@ -1498,8 +1498,7 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
pScanInfo->cacheHit = 0; pScanInfo->cacheHit = 0;
pVirtualTables = pScanInfo->pVirtualTables; pVirtualTables = pScanInfo->pVirtualTables;
if (taosHashGetSize(pVirtualTables) == 0 || taosHashGetSize(pReader->tbIdHash) == 0 || if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
taosArrayGetSize(pReader->pColIdList) == 0) {
goto _end; goto _end;
} }
@ -1507,13 +1506,10 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno); TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables); taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
pIter = taosHashIterate(pReader->tbIdHash, NULL); pIter = taosHashIterate(pVirtualTables, NULL);
while (pIter != NULL) { while (pIter != NULL) {
int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL); int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
SArray* pColInfos = *(SArray**)pIter;
px = taosHashGet(pVirtualTables, &vTbUid, sizeof(int64_t));
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
SArray* pColInfos = *(SArray**)px;
TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
// Traverse all required columns and collect corresponding physical tables // Traverse all required columns and collect corresponding physical tables
@ -1548,7 +1544,7 @@ static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
j++; j++;
} }
} }
pIter = taosHashIterate(pReader->tbIdHash, pIter); pIter = taosHashIterate(pVirtualTables, pIter);
} }
pScanInfo->pPhysicalTables = pPhysicalTables; pScanInfo->pPhysicalTables = pPhysicalTables;
@ -1574,9 +1570,8 @@ _end:
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) { static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
if (value) { if (value) {
SSchemaWrapper** ppSchemaWrapper = value; SSchemaWrapper* pSchemaWrapper = value;
tDeleteSchemaWrapper(*ppSchemaWrapper); tDeleteSchemaWrapper(pSchemaWrapper);
*ppSchemaWrapper = NULL;
} }
} }
@ -1686,8 +1681,8 @@ int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const
SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, j); SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, j);
TSDB_CHECK_NULL(pOutCol, code, lino, _end, terrno); TSDB_CHECK_NULL(pOutCol, code, lino, _end, terrno);
if (i >= nColInfos) { if (i >= nColInfos) {
tqInfo("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos, pOutCol->info.colId, tqTrace("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos,
idstr); pOutCol->info.colId, idstr);
colDataSetNNULL(pOutCol, 0, numOfRows); colDataSetNNULL(pOutCol, 0, numOfRows);
j++; j++;
continue; continue;
@ -1699,17 +1694,26 @@ int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const
i++; i++;
continue; continue;
} else if (pCol->vColId > pOutCol->info.colId) { } else if (pCol->vColId > pOutCol->info.colId) {
tqInfo("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__, tqTrace("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__,
pOutCol->info.colId, pCol->vColId, idstr); pOutCol->info.colId, pCol->vColId, idstr);
colDataSetNNULL(pOutCol, 0, numOfRows); colDataSetNNULL(pOutCol, 0, numOfRows);
j++; j++;
continue; continue;
} }
// copy data from physical table to the result block of virtual table // skip this column if it is from another physical table
if (pCol->pTbUid != pTbUid) { if (pCol->pTbUid != pTbUid) {
// skip this column since it is from another physical table tqTrace("skip column %d of virtual table %" PRId64 " since it is from table %" PRId64
} else if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { ", current block table %" PRId64 ", id: %s",
pCol->vColId, vTbUid, pCol->pTbUid, pTbUid, idstr);
colDataSetNNULL(pOutCol, 0, numOfRows);
i++;
j++;
continue;
}
// copy data from physical table to the result block of virtual table
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
// try to find the corresponding column data of physical table // try to find the corresponding column data of physical table
SColData* pColData = NULL; SColData* pColData = NULL;
for (int32_t k = 0; k < nInputCols; ++k) { for (int32_t k = 0; k < nInputCols; ++k) {
@ -1860,7 +1864,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr); tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
} }
return (code == TSDB_CODE_SUCCESS); return false;
} }
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) { bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {

View File

@ -77,7 +77,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
.pOtherBackend = NULL, .pOtherBackend = NULL,
}; };
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) {
handle.vnode = ((STQ*)pMeta->ahandle)->pVnode; handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
handle.initTqReader = 1; handle.initTqReader = 1;
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
@ -86,7 +86,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG ||
pTask->info.taskLevel == TASK_LEVEL__MERGE) {
if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) { if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
handle.pStateBackend = pTask->pRecalState; handle.pStateBackend = pTask->pRecalState;
handle.pOtherBackend = pTask->pState; handle.pOtherBackend = pTask->pState;
@ -113,6 +114,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code)); tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
return code; return code;
} }
qSetStreamMergeInfo(pTask->exec.pExecutor, pTask->pVTables);
} }
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);

View File

@ -633,7 +633,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (TDMT_VND_TABLE_CFG == msgType) { if (TDMT_VND_TABLE_CFG == msgType) {
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
pName = ctx->pName; pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType ||
TDMT_VND_VSUBTABLES_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
@ -714,7 +715,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (TDMT_VND_TABLE_CFG == msgType) { if (TDMT_VND_TABLE_CFG == msgType) {
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
pName = ctx->pName; pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType ||
TDMT_VND_VSUBTABLES_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);

View File

@ -546,7 +546,9 @@ typedef struct SStreamScanInfo {
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
STqReader* tqReader; STqReader* tqReader;
SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle SHashObj* pVtableMergeHandles; // key: vtable uid, value: SStreamVtableMergeHandle
SDiskbasedBuf* pVtableMergeBuf; // page buffer used by vtable merge
SArray* pVtableReadyHandles;
uint64_t groupId; uint64_t groupId;
bool igCheckGroupId; bool igCheckGroupId;

View File

@ -188,7 +188,7 @@ int32_t createEventNonblockOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHandle* readHandle, STableListInfo* pTableListInfo, int32_t numOfDownstream, SVirtualScanPhysiNode * pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo); int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHandle* readHandle, STableListInfo* pTableListInfo, int32_t numOfDownstream, SVirtualScanPhysiNode * pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo);
int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
// clang-format on // clang-format on
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,

View File

@ -77,6 +77,7 @@ typedef struct {
char* stbFullName; // used to generate dest child table name char* stbFullName; // used to generate dest child table name
bool newSubTableRule; // used to generate dest child table name bool newSubTableRule; // used to generate dest child table name
STaskNotifyEventStat* pNotifyEventStat; // used to store notify event statistics STaskNotifyEventStat* pNotifyEventStat; // used to store notify event statistics
SArray * pVTables; // used to store merge info for merge task, SArray<SVCTableMergeInfo>
} SStreamTaskInfo; } SStreamTaskInfo;
struct SExecTaskInfo { struct SExecTaskInfo {

View File

@ -33,12 +33,14 @@ typedef enum {
int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int32_t nSrcTbls, int32_t numPageLimit, int32_t streamVtableMergeCreateHandle(SStreamVtableMergeHandle **ppHandle, int32_t nSrcTbls, int32_t numPageLimit,
SDiskbasedBuf *pBuf, SSDataBlock *pResBlock, const char *idstr); SDiskbasedBuf *pBuf, SSDataBlock *pResBlock, const char *idstr);
void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle **ppHandle); void streamVtableMergeDestroyHandle(void *ppHandle);
int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle *pHandle, SSDataBlock *pDataBlock, const char *idstr); int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle *pHandle, SSDataBlock *pDataBlock, const char *idstr);
int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle *pHandle, SSDataBlock *pResBlock, SVM_NEXT_RESULT *pRes, int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle *pHandle, SVM_NEXT_RESULT *pRes, const char *idstr);
const char *idstr);
int32_t streamVtableMergeCurrent(SStreamVtableMergeHandle *pHandle, SSDataBlock **ppDataBlock, int32_t *pRowIdx,
const char *idstr);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -141,8 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
const char* id) { const char* id) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN && if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
@ -275,6 +274,15 @@ _end:
return code; return code;
} }
void qSetStreamMergeInfo(qTaskInfo_t tinfo, SArray* pVTables) {
if (tinfo == 0 || pVTables == NULL) {
return;
}
SStreamTaskInfo* pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
pStreamInfo->pVTables = pVTables;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -382,7 +382,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
return terrno; return terrno;
} }
if (pHandle->vnode) { if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
if (code) { if (code) {
@ -515,8 +515,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator); code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
} else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model != OPTR_EXEC_MODEL_STREAM) { } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model != OPTR_EXEC_MODEL_STREAM) {
code = createVirtualTableMergeOperatorInfo(NULL, pHandle, NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator); code = createVirtualTableMergeOperatorInfo(NULL, pHandle, NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
} else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model == OPTR_EXEC_MODEL_STREAM) {
code = createStreamVtableMergeOperatorInfo(pHandle, (SVirtualScanPhysiNode*)pPhyNode, pTagCond, pTaskInfo, &pOperator);
} else { } else {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code; pTaskInfo->code = code;
@ -689,6 +687,27 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
code = createVirtualTableMergeOperatorInfo(ops, pHandle, pTableListInfo, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr); code = createVirtualTableMergeOperatorInfo(ops, pHandle, pTableListInfo, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model == OPTR_EXEC_MODEL_STREAM) {
SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (!pTableListInfo) {
pTaskInfo->code = terrno;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
code = createScanTableListInfo(&pVirtualTableScanNode->scan, pVirtualTableScanNode->pGroupTags,
pVirtualTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond,
pTaskInfo);
if (code) {
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return code;
}
code = createStreamVtableMergeOperatorInfo(ops[0], pHandle, pVirtualTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOptr);
tableListDestroy(pTableListInfo);
} else { } else {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code; pTaskInfo->code = code;

View File

@ -3762,9 +3762,20 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
SSHashObj* pVtableInfos = pTaskInfo->pSubplan->pVTables;
qDebug("stream scan started, %s", id); qDebug("stream scan started, %s", id);
if (pVtableInfos != NULL && pStreamInfo->recoverStep != STREAM_RECOVER_STEP__NONE) {
qError("stream vtable source scan should not have recovery step: %d", pStreamInfo->recoverStep);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
}
if (pVtableInfos != NULL && !pInfo->igCheckUpdate) {
qError("stream vtable source scan should have igCheckUpdate");
pInfo->igCheckUpdate = false;
}
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 || if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) { pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
@ -3863,6 +3874,10 @@ static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// TODO: refactor // TODO: refactor
FETCH_NEXT_BLOCK: FETCH_NEXT_BLOCK:
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pVtableInfos != NULL) {
qInfo("stream vtable source scan would ignore all data blocks");
pInfo->validBlockIndex = total;
}
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
(*ppRes) = NULL; (*ppRes) = NULL;
@ -4013,6 +4028,10 @@ FETCH_NEXT_BLOCK:
return code; return code;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id); qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
if (pVtableInfos != NULL && pInfo->scanMode != STREAM_SCAN_FROM_READERHANDLE) {
qError("stream vtable source scan should not have scan mode: %d", pInfo->scanMode);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
switch (pInfo->scanMode) { switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: { case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
@ -4167,6 +4186,11 @@ FETCH_NEXT_BLOCK:
continue; continue;
} }
if (pVtableInfos != NULL && pInfo->pCreateTbRes->info.rows > 0) {
qError("stream vtable source scan should not have create table res");
blockDataCleanup(pInfo->pCreateTbRes);
}
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s", qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s",
@ -4178,8 +4202,11 @@ FETCH_NEXT_BLOCK:
code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type); setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); if (pVtableInfos == NULL) {
QUERY_CHECK_CODE(code, lino, _end); // filter should be applied in merge task for vtables
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -4494,6 +4521,14 @@ void destroyStreamScanOperatorInfo(void* param) {
taosHashCleanup(pStreamScan->pVtableMergeHandles); taosHashCleanup(pStreamScan->pVtableMergeHandles);
pStreamScan->pVtableMergeHandles = NULL; pStreamScan->pVtableMergeHandles = NULL;
} }
if (pStreamScan->pVtableMergeBuf) {
destroyDiskbasedBuf(pStreamScan->pVtableMergeBuf);
pStreamScan->pVtableMergeBuf = NULL;
}
if (pStreamScan->pVtableReadyHandles) {
taosArrayDestroy(pStreamScan->pVtableReadyHandles);
pStreamScan->pVtableReadyHandles = NULL;
}
if (pStreamScan->matchInfo.pList) { if (pStreamScan->matchInfo.pList) {
taosArrayDestroy(pStreamScan->matchInfo.pList); taosArrayDestroy(pStreamScan->matchInfo.pList);
} }
@ -4962,8 +4997,8 @@ _error:
taosArrayDestroy(pColIds); taosArrayDestroy(pColIds);
} }
if (pInfo != NULL) { if (pInfo != NULL && pInfo->pTableScanOp != NULL) {
STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info; STableScanInfo* p = (STableScanInfo*)pInfo->pTableScanOp->info;
if (p != NULL) { if (p != NULL) {
p->base.pTableListInfo = NULL; p->base.pTableListInfo = NULL;
} }

View File

@ -50,6 +50,7 @@ typedef struct SStreamVtableMergeHandle {
SSDataBlock* datablock; // Does not store data, only used to save the schema of input/output data blocks SSDataBlock* datablock; // Does not store data, only used to save the schema of input/output data blocks
SMultiwayMergeTreeInfo* pMergeTree; SMultiwayMergeTreeInfo* pMergeTree;
int32_t numEmptySources;
int64_t globalLatestTs; int64_t globalLatestTs;
} SStreamVtableMergeHandle; } SStreamVtableMergeHandle;
@ -90,6 +91,9 @@ static int32_t svmSourceFlushInput(SStreamVtableMergeSource* pSource, const char
// check data block size // check data block size
pBlock = pSource->pInputDataBlock; pBlock = pSource->pInputDataBlock;
if (blockDataGetNumOfRows(pBlock) == 0) {
goto _end;
}
int32_t size = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t); int32_t size = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
QUERY_CHECK_CONDITION(size <= getBufPageSize(pSource->pBuf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR); QUERY_CHECK_CONDITION(size <= getBufPageSize(pSource->pBuf), code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
@ -123,8 +127,6 @@ _end:
static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock* pDataBlock, const char* idstr) { static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock* pDataBlock, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t pageSize = 0;
int32_t holdSize = 0;
SSDataBlock* pInputDataBlock = NULL; SSDataBlock* pInputDataBlock = NULL;
QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
@ -139,21 +141,25 @@ static int32_t svmSourceAddBlock(SStreamVtableMergeSource* pSource, SSDataBlock*
int32_t start = 0; int32_t start = 0;
int32_t nrows = blockDataGetNumOfRows(pDataBlock); int32_t nrows = blockDataGetNumOfRows(pDataBlock);
int32_t pageSize =
getBufPageSize(pSource->pBuf) - sizeof(int32_t) - taosArrayGetSize(pInputDataBlock->pDataBlock) * sizeof(int32_t);
while (start < nrows) { while (start < nrows) {
int32_t holdSize = blockDataGetSize(pInputDataBlock); int32_t holdSize = blockDataGetSize(pInputDataBlock);
QUERY_CHECK_CONDITION(holdSize < pageSize, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); QUERY_CHECK_CONDITION(holdSize < pageSize, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
int32_t stop = 0; int32_t stop = start;
code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize - holdSize); code = blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize - holdSize);
QUERY_CHECK_CODE(code, lino, _end);
if (stop == start - 1) { if (stop == start - 1) {
// If pInputDataBlock cannot hold new rows, ignore the error and write pInputDataBlock to the buffer // If pInputDataBlock cannot hold new rows, ignore the error and write pInputDataBlock to the buffer
} else { } else {
QUERY_CHECK_CODE(code, lino, _end);
// append new rows to pInputDataBlock // append new rows to pInputDataBlock
if (blockDataGetNumOfRows(pInputDataBlock) == 0) { if (blockDataGetNumOfRows(pInputDataBlock) == 0) {
// set expires time for the first block // set expires time for the first block
pSource->currentExpireTimeMs = taosGetTimestampMs() + tsStreamVirtualMergeMaxDelayMs; pSource->currentExpireTimeMs = taosGetTimestampMs() + tsStreamVirtualMergeMaxDelayMs;
} }
int32_t numOfRows = stop - start + 1; int32_t numOfRows = stop - start + 1;
code = blockDataEnsureCapacity(pInputDataBlock, pInputDataBlock->info.rows + numOfRows);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataMergeNRows(pInputDataBlock, pDataBlock, start, numOfRows); code = blockDataMergeNRows(pInputDataBlock, pDataBlock, start, numOfRows);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -176,6 +182,17 @@ _end:
static bool svmSourceIsEmpty(SStreamVtableMergeSource* pSource) { return listNEles(pSource->pageInfoList) == 0; } static bool svmSourceIsEmpty(SStreamVtableMergeSource* pSource) { return listNEles(pSource->pageInfoList) == 0; }
static int64_t svmSourceGetExpireTime(SStreamVtableMergeSource* pSource) {
SListNode* pn = tdListGetHead(pSource->pageInfoList);
if (pn != NULL) {
SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data;
if (pageInfo != NULL) {
return pageInfo->expireTimeMs;
}
}
return INT64_MAX;
}
static int32_t svmSourceReadBuf(SStreamVtableMergeSource* pSource, const char* idstr) { static int32_t svmSourceReadBuf(SStreamVtableMergeSource* pSource, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
@ -188,6 +205,11 @@ static int32_t svmSourceReadBuf(SStreamVtableMergeSource* pSource, const char* i
QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
blockDataCleanup(pSource->pOutputDataBlock); blockDataCleanup(pSource->pOutputDataBlock);
int32_t numOfCols = taosArrayGetSize(pSource->pOutputDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; i++) {
SColumnInfoData* pCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, i);
pCol->hasNull = true;
}
pn = tdListGetHead(pSource->pageInfoList); pn = tdListGetHead(pSource->pageInfoList);
QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
@ -215,21 +237,14 @@ static int32_t svmSourceCurrentTs(SStreamVtableMergeSource* pSource, const char*
SColumnInfoData* tsCol = NULL; SColumnInfoData* tsCol = NULL;
QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_CONDITION(!svmSourceIsEmpty(pSource), code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_CONDITION(pSource->rowIndex >= 0 && pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock),
if (blockDataGetNumOfRows(pSource->pOutputDataBlock) == 0) { code, lino, _end, TSDB_CODE_INVALID_PARA);
code = svmSourceReadBuf(pSource, idstr);
QUERY_CHECK_CODE(code, lino, _end);
}
QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end,
TSDB_CODE_INVALID_PARA);
tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, 0); tsCol = taosArrayGet(pSource->pOutputDataBlock->pDataBlock, 0);
QUERY_CHECK_NULL(tsCol, code, lino, _end, terrno); QUERY_CHECK_NULL(tsCol, code, lino, _end, terrno);
*pTs = ((int64_t*)tsCol->pData)[pSource->rowIndex]; *pTs = ((int64_t*)tsCol->pData)[pSource->rowIndex];
pSource->latestTs = TMAX(*pTs, pSource->latestTs);
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -238,55 +253,54 @@ _end:
return code; return code;
} }
static int32_t svmSourceMoveNext(SStreamVtableMergeSource* pSource, const char* idstr, SVM_NEXT_RESULT* pRes) { static int32_t svmSourceMoveNext(SStreamVtableMergeSource* pSource, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SListNode* pn = NULL; SListNode* pn = NULL;
void* page = NULL; void* page = NULL;
int64_t latestTs = 0;
QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pSource->pOutputDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pRes = SVM_NEXT_NOT_READY;
latestTs = pSource->latestTs;
while (true) { while (true) {
if (svmSourceIsEmpty(pSource)) { if (pSource->rowIndex >= 0) {
pSource->rowIndex = 0; QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end,
break; TSDB_CODE_INVALID_PARA);
pSource->rowIndex++;
if (pSource->rowIndex >= blockDataGetNumOfRows(pSource->pOutputDataBlock)) {
// Pop the page from the list and recycle it
pn = tdListPopHead(pSource->pageInfoList);
QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
QUERY_CHECK_NULL(pn->data, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data;
page = getBufPage(pSource->pBuf, pageInfo->pageId);
QUERY_CHECK_NULL(page, code, lino, _end, terrno);
code = dBufSetBufPageRecycled(pSource->pBuf, page);
QUERY_CHECK_CODE(code, lino, _end);
(*pSource->pTotalPages)--;
taosMemoryFreeClear(pn);
pSource->rowIndex = -1;
}
} }
QUERY_CHECK_CONDITION(pSource->rowIndex < blockDataGetNumOfRows(pSource->pOutputDataBlock), code, lino, _end, if (pSource->rowIndex == -1) {
TSDB_CODE_INVALID_PARA); if (svmSourceIsEmpty(pSource)) {
break;
pSource->rowIndex++; }
if (pSource->rowIndex >= blockDataGetNumOfRows(pSource->pOutputDataBlock)) { // Read the first page from the list
// Pop the page from the list and recycle it code = svmSourceReadBuf(pSource, idstr);
pn = tdListPopHead(pSource->pageInfoList);
QUERY_CHECK_NULL(pn, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
QUERY_CHECK_NULL(pn->data, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
SVMBufPageInfo* pageInfo = (SVMBufPageInfo*)pn->data;
page = getBufPage(pSource->pBuf, pageInfo->pageId);
QUERY_CHECK_NULL(page, code, lino, _end, terrno);
code = dBufSetBufPageRecycled(pSource->pBuf, page);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
(*pSource->pTotalPages)--;
taosMemoryFreeClear(pn);
pSource->rowIndex = 0; pSource->rowIndex = 0;
} }
if (svmSourceIsEmpty(pSource)) { // Check the timestamp of the current row
pSource->rowIndex = 0; int64_t currentTs = INT64_MIN;
break; code = svmSourceCurrentTs(pSource, idstr, &currentTs);
} if (currentTs > pSource->latestTs) {
pSource->latestTs = currentTs;
int64_t ts = 0; if (currentTs >= *pSource->pGlobalLatestTs) {
code = svmSourceCurrentTs(pSource, idstr, &ts); break;
QUERY_CHECK_CODE(code, lino, _end); }
if (ts > latestTs && ts >= *pSource->pGlobalLatestTs) {
*pRes = SVM_NEXT_FOUND;
break;
} }
} }
@ -306,6 +320,12 @@ static int32_t svmSourceCompare(const void* pLeft, const void* pRight, void* par
SStreamVtableMergeSource* pLeftSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, left); SStreamVtableMergeSource* pLeftSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, left);
SStreamVtableMergeSource* pRightSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, right); SStreamVtableMergeSource* pRightSource = *(SStreamVtableMergeSource**)taosArrayGet(pValidSources, right);
if (svmSourceIsEmpty(pLeftSource)) {
return 1;
} else if (svmSourceIsEmpty(pRightSource)) {
return -1;
}
int64_t leftTs = 0; int64_t leftTs = 0;
code = svmSourceCurrentTs(pLeftSource, "", &leftTs); code = svmSourceCurrentTs(pLeftSource, "", &leftTs);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -339,6 +359,7 @@ static SStreamVtableMergeSource* svmAddSource(SStreamVtableMergeHandle* pHandle,
QUERY_CHECK_NULL(pSource->pageInfoList, code, lino, _end, terrno); QUERY_CHECK_NULL(pSource->pageInfoList, code, lino, _end, terrno);
code = createOneDataBlock(pHandle->datablock, false, &pSource->pOutputDataBlock); code = createOneDataBlock(pHandle->datablock, false, &pSource->pOutputDataBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
pSource->rowIndex = -1;
pSource->latestTs = INT64_MIN; pSource->latestTs = INT64_MIN;
pSource->pGlobalLatestTs = &pHandle->globalLatestTs; pSource->pGlobalLatestTs = &pHandle->globalLatestTs;
code = taosHashPut(pHandle->pSources, &uid, sizeof(uid), &pSource, POINTER_BYTES); code = taosHashPut(pHandle->pSources, &uid, sizeof(uid), &pSource, POINTER_BYTES);
@ -387,14 +408,16 @@ static int32_t svmBuildTree(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT*
pIter = taosHashIterate(pHandle->pSources, NULL); pIter = taosHashIterate(pHandle->pSources, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SStreamVtableMergeSource* pSource = *(SStreamVtableMergeSource**)pIter; SStreamVtableMergeSource* pSource = *(SStreamVtableMergeSource**)pIter;
if (svmSourceIsEmpty(pSource)) { code = svmSourceFlushInput(pSource, idstr);
code = svmSourceFlushInput(pSource, idstr); QUERY_CHECK_CODE(code, lino, _end);
if (pSource->rowIndex == -1) {
code = svmSourceMoveNext(pSource, idstr);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
if (!svmSourceIsEmpty(pSource)) { if (!svmSourceIsEmpty(pSource)) {
px = taosArrayPush(pReadySources, &pSource); px = taosArrayPush(pReadySources, &pSource);
QUERY_CHECK_NULL(px, code, lino, _end, terrno); QUERY_CHECK_NULL(px, code, lino, _end, terrno);
globalExpireTimeMs = TMIN(globalExpireTimeMs, pSource->currentExpireTimeMs); globalExpireTimeMs = TMIN(globalExpireTimeMs, svmSourceGetExpireTime(pSource));
} }
pIter = taosHashIterate(pHandle->pSources, pIter); pIter = taosHashIterate(pHandle->pSources, pIter);
} }
@ -427,6 +450,7 @@ static int32_t svmBuildTree(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT*
void* param = NULL; void* param = NULL;
code = tMergeTreeCreate(&pHandle->pMergeTree, taosArrayGetSize(pReadySources), pReadySources, svmSourceCompare); code = tMergeTreeCreate(&pHandle->pMergeTree, taosArrayGetSize(pReadySources), pReadySources, svmSourceCompare);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
pHandle->numEmptySources = 0;
pReadySources = NULL; pReadySources = NULL;
*pRes = SVM_NEXT_FOUND; *pRes = SVM_NEXT_FOUND;
@ -453,7 +477,7 @@ int32_t streamVtableMergeAddBlock(SStreamVtableMergeHandle* pHandle, SSDataBlock
QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
pTbUid = pDataBlock->info.id.uid; pTbUid = pDataBlock->info.id.groupId;
px = taosHashGet(pHandle->pSources, &pTbUid, sizeof(int64_t)); px = taosHashGet(pHandle->pSources, &pTbUid, sizeof(int64_t));
if (px == NULL) { if (px == NULL) {
@ -480,8 +504,31 @@ _end:
return code; return code;
} }
int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle* pHandle, SSDataBlock* pResBlock, SVM_NEXT_RESULT* pRes, int32_t streamVtableMergeCurrent(SStreamVtableMergeHandle* pHandle, SSDataBlock** ppDataBlock, int32_t* pRowIdx,
const char* idstr) { const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pHandle->pMergeTree, code, lino, _end, TSDB_CODE_INVALID_PARA);
int32_t idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SArray* pReadySources = pHandle->pMergeTree->param;
void* px = taosArrayGet(pReadySources, idx);
QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
SStreamVtableMergeSource* pSource = *(SStreamVtableMergeSource**)px;
QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
*ppDataBlock = pSource->pOutputDataBlock;
*pRowIdx = pSource->rowIndex;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
}
return code;
}
int32_t streamVtableMergeMoveNext(SStreamVtableMergeHandle* pHandle, SVM_NEXT_RESULT* pRes, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
void* px = NULL; void* px = NULL;
@ -489,61 +536,74 @@ int32_t streamVtableMergeNextTuple(SStreamVtableMergeHandle* pHandle, SSDataBloc
SStreamVtableMergeSource* pSource = NULL; SStreamVtableMergeSource* pSource = NULL;
QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pResBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pRes = SVM_NEXT_NOT_READY; *pRes = SVM_NEXT_NOT_READY;
if (pHandle->pMergeTree == NULL) { if (pHandle->pMergeTree == NULL) {
SVM_NEXT_RESULT buildRes = SVM_NEXT_NOT_READY; code = svmBuildTree(pHandle, pRes, idstr);
code = svmBuildTree(pHandle, &buildRes, idstr);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (buildRes == SVM_NEXT_NOT_READY) { goto _end;
goto _end;
}
} }
QUERY_CHECK_NULL(pHandle->pMergeTree, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
int32_t idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree); int32_t idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
pReadySources = pHandle->pMergeTree->param; pReadySources = pHandle->pMergeTree->param;
px = taosArrayGet(pReadySources, idx); px = taosArrayGet(pReadySources, idx);
QUERY_CHECK_NULL(px, code, lino, _end, terrno); QUERY_CHECK_NULL(px, code, lino, _end, terrno);
pSource = *(SStreamVtableMergeSource**)px; pSource = *(SStreamVtableMergeSource**)px;
code = blockCopyOneRow(pSource->pOutputDataBlock, pSource->rowIndex, &pResBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
*pRes = SVM_NEXT_FOUND;
pHandle->globalLatestTs = TMAX(pSource->latestTs, pHandle->globalLatestTs); pHandle->globalLatestTs = TMAX(pSource->latestTs, pHandle->globalLatestTs);
SVM_NEXT_RESULT nextRes = SVM_NEXT_NOT_READY; int32_t origNumOfPages = pHandle->numOfPages;
int32_t origNumOfPages = pHandle->numOfPages; code = svmSourceMoveNext(pSource, idstr);
code = svmSourceMoveNext(pSource, idstr, &nextRes);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
bool needDestroy = false; if (svmSourceIsEmpty(pSource)) {
if (nextRes == SVM_NEXT_NOT_READY) { ++pHandle->numEmptySources;
needDestroy = true;
} else if (taosArrayGetSize((SArray*)pHandle->pMergeTree->param) != pHandle->nSrcTbls &&
pHandle->numOfPages != origNumOfPages) {
// The original data for this portion is incomplete. Its merge was forcibly triggered by certain conditions, so we
// must recheck if those conditions are still met.
if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_DELAY) {
int64_t globalExpireTimeMs = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pReadySources); ++i) {
px = taosArrayGet(pReadySources, i);
QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
pSource = *(SStreamVtableMergeSource**)px;
globalExpireTimeMs = TMIN(globalExpireTimeMs, pSource->currentExpireTimeMs);
}
needDestroy = taosGetTimestampMs() < globalExpireTimeMs;
} else if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_MEMORY) {
needDestroy = pHandle->numOfPages < pHandle->numPageLimit;
} else {
code = TSDB_CODE_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
} }
bool needDestroy = false;
if (pHandle->numEmptySources == taosArrayGetSize(pReadySources)) {
// all sources are empty
needDestroy = true;
} else {
code = tMergeTreeAdjust(pHandle->pMergeTree, tMergeTreeGetAdjustIndex(pHandle->pMergeTree));
QUERY_CHECK_CODE(code, lino, _end);
if (pHandle->numEmptySources > 0) {
if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_WAIT_FOREVER) {
idx = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
px = taosArrayGet(pReadySources, idx);
QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
pSource = *(SStreamVtableMergeSource**)px;
QUERY_CHECK_NULL(pSource, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
int64_t currentTs = INT64_MIN;
code = svmSourceCurrentTs(pSource, idstr, &currentTs);
QUERY_CHECK_CODE(code, lino, _end);
needDestroy = currentTs > pHandle->globalLatestTs;
} else if (pHandle->numOfPages != origNumOfPages) {
// The original data for this portion is incomplete. Its merge was forcibly triggered by certain conditions, so
// we must recheck if those conditions are still met.
if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_DELAY) {
int64_t globalExpireTimeMs = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pReadySources); ++i) {
px = taosArrayGet(pReadySources, i);
QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
pSource = *(SStreamVtableMergeSource**)px;
globalExpireTimeMs = TMIN(globalExpireTimeMs, svmSourceGetExpireTime(pSource));
}
needDestroy = taosGetTimestampMs() < globalExpireTimeMs;
} else if (tsStreamVirtualMergeWaitMode == STREAM_VIRTUAL_MERGE_MAX_MEMORY) {
needDestroy = pHandle->numOfPages < pHandle->numPageLimit;
} else {
code = TSDB_CODE_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
}
}
if (needDestroy) { if (needDestroy) {
svmDestroyTree(&pHandle->pMergeTree); svmDestroyTree(&pHandle->pMergeTree);
} else {
*pRes = SVM_NEXT_FOUND;
} }
_end: _end:
@ -590,7 +650,8 @@ _end:
return code; return code;
} }
void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle** ppHandle) { void streamVtableMergeDestroyHandle(void* ptr) {
SStreamVtableMergeHandle** ppHandle = ptr;
if (ppHandle == NULL || *ppHandle == NULL) { if (ppHandle == NULL || *ppHandle == NULL) {
return; return;
} }
@ -600,7 +661,7 @@ void streamVtableMergeDestroyHandle(SStreamVtableMergeHandle** ppHandle) {
taosHashCleanup(pHandle->pSources); taosHashCleanup(pHandle->pSources);
pHandle->pSources = NULL; pHandle->pSources = NULL;
} }
blockDataDestroy(pHandle->datablock);
svmDestroyTree(&pHandle->pMergeTree); svmDestroyTree(&pHandle->pMergeTree);
taosMemoryFreeClear(*ppHandle); taosMemoryFreeClear(*ppHandle);

View File

@ -680,499 +680,270 @@ int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, SReadHa
STableListInfo* pTableListInfo, int32_t numOfDownstream, STableListInfo* pTableListInfo, int32_t numOfDownstream,
SVirtualScanPhysiNode* pVirtualScanPhyNode, SExecTaskInfo* pTaskInfo, SVirtualScanPhysiNode* pVirtualScanPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) { SOperatorInfo** pOptrInfo) {
SPhysiNode* pPhyNode = (SPhysiNode*)pVirtualScanPhyNode; SPhysiNode* pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
int32_t lino = 0; int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo)); SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno); QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno);
QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno); QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno);
pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder; pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder; pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo; SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno); TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno); TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno);
pVirtualScanInfo->pInputBlock = pInputBlock; pVirtualScanInfo->pInputBlock = pInputBlock;
if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) { if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup; SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
pSup->pExprInfo = NULL; pSup->pExprInfo = NULL;
VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs)); VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
&pTaskInfo->storageAPI.functionStore); &pTaskInfo->storageAPI.functionStore);
TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno); TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno);
} }
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return); TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
if (!pVirtualScanPhyNode->scan.node.dynamicOp) { if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0)); VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys); pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno); TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
} }
pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
pVirtualScanInfo->sortBufSize = pVirtualScanInfo->sortBufSize =
pVirtualScanInfo->bufPageSize * (numOfDownstream + 1); // one additional is reserved for merged result. pVirtualScanInfo->bufPageSize * (numOfDownstream + 1); // one additional is reserved for merged result.
VTS_ERR_JRET(extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId)); VTS_ERR_JRET(
extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId));
pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols; pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0)); VTS_ERR_JRET(
filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0));
pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno); QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno);
pVirtualScanInfo->base.readHandle = *readHandle; pVirtualScanInfo->base.readHandle = *readHandle;
pVirtualScanInfo->base.pTableListInfo = pTableListInfo; pVirtualScanInfo->base.pTableListInfo = pTableListInfo;
setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false, setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo); OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo, createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
if (NULL != pDownstream) { if (NULL != pDownstream) {
VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream)); VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
} }
nodesDestroyList(pMergeKeys);
nodesDestroyList(pMergeKeys); *pOptrInfo = pOperator;
*pOptrInfo = pOperator; return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
_return: _return:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }
if (pInfo != NULL) { if (pInfo != NULL) {
destroyVirtualTableScanOperatorInfo(pInfo); destroyVirtualTableScanOperatorInfo(pInfo);
} }
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
pTaskInfo->code = code; pTaskInfo->code = code;
destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
return code; return code;
} }
static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static int32_t doStreamVtableMergeNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// NOTE: this operator does never check if current status is done or not int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0;
int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const char* id = GET_TASKID(pTaskInfo);
const char* id = GET_TASKID(pTaskInfo);
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; SSDataBlock* pResBlock = pInfo->pRes;
SArray* pVTables = pTaskInfo->streamInfo.pVTables;
void* pIter = NULL;
qDebug("stream scan started, %s", id); (*ppRes) = NULL;
if (pOperator->status == OP_EXEC_DONE) {
goto _end;
}
qDebug("===stream=== stream vtable merge next, taskId:%s", id);
// TODO(kjq): add fill history recover step // TODO(kjq): add fill history recover step
size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->pVtableMergeHandles == NULL) {
// TODO: refactor pInfo->pVtableMergeHandles = taosHashInit(taosArrayGetSize(pVTables),
FETCH_NEXT_BLOCK: taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { QUERY_CHECK_NULL(pInfo->pVtableMergeHandles, code, lino, _end, terrno);
if (pInfo->validBlockIndex >= total) { taosHashSetFreeFp(pInfo->pVtableMergeHandles, streamVtableMergeDestroyHandle);
doClearBufferedBlocks(pInfo);
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++; int32_t nTables = taosArrayGetSize(pVTables);
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); int32_t numPagePerTable = getNumOfInMemBufPages(pInfo->pVtableMergeBuf) / nTables;
for (int32_t i = 0; i < nTables; ++i) {
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SVCTableMergeInfo* pTableInfo = taosArrayGet(pVTables, i);
QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno); if (pTableInfo == NULL) {
SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.parTbName[0]) {
code =
pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
QUERY_CHECK_CODE(code, lino, _end);
}
// TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
pBlock->info.dataLoad = 1;
if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
code = blockDataUpdateTsWindow(pBlock, 0);
QUERY_CHECK_CODE(code, lino, _end);
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_GET_ALL:
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
(*ppRes) = pBlock;
return code;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
code = copyDataBlock(pInfo->pUpdateRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->updateResIndex = 0;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) {
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = filterDelBlockByUid(pDelBlock, pBlock, pInfo->tqReader, &pInfo->readerFn);
QUERY_CHECK_CODE(code, lino, _end);
} else {
pDelBlock = pBlock;
}
code = setBlockGroupIdByUid(pInfo, pDelBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
QUERY_CHECK_CODE(code, lino, _end);
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered",
GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
goto FETCH_NEXT_BLOCK;
}
if (!isStreamWindow(pInfo)) {
code = generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->partitionSup.needCalc) {
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
} else {
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
}
blockDataDestroy(pDelBlock);
if (pInfo->pDeleteDataRes->info.rows > 0) {
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result",
GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
(*ppRes) = pInfo->pDeleteDataRes;
return code;
} else {
goto FETCH_NEXT_BLOCK;
}
} else {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
code = generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes, STREAM_DELETE_DATA);
QUERY_CHECK_CODE(code, lino, _end);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
if (pInfo->pDeleteDataRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result",
GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
(*ppRes) = pInfo->pDeleteDataRes;
return code;
} else {
goto FETCH_NEXT_BLOCK;
}
}
} break;
case STREAM_GET_RESULT: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
pInfo->lastScanRange = pBlock->info.window;
TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
if (pInfo->useGetResultRange == true) {
endKey = pBlock->info.window.ekey;
}
code = copyGetResultBlock(pInfo->pUpdateRes, pBlock->info.window.skey, endKey);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pUpdateInfo->maxDataVersion = -1;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
} break;
case STREAM_DROP_CHILD_TABLE: {
int32_t deleteNum = 0;
code = deletePartName(&pInfo->stateStore, pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock, &deleteNum);
QUERY_CHECK_CODE(code, lino, _end);
if (deleteNum == 0) {
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
qDebug("===stream=== ignore block type 18, delete num is 0");
goto FETCH_NEXT_BLOCK;
}
} break;
case STREAM_CHECKPOINT: {
qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
} break;
default:
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "block recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
(*ppRes) = pBlock;
return code;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
code = doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
QUERY_CHECK_CODE(code, lino, _end);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pRes->info.dataLoad = 1;
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pRes;
return code;
}
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
__LINE__);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
case STREAM_SCAN_FROM_UPDATERES: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
(*ppRes) = pInfo->pUpdateRes;
return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
__LINE__);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
if (pInfo->pRangeScanRes != NULL) {
(*ppRes) = pInfo->pRangeScanRes;
pInfo->pRangeScanRes = NULL;
return code;
}
SSDataBlock* pSDB = NULL;
code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB);
QUERY_CHECK_CODE(code, lino, _end);
if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
code = checkUpdateData(pInfo, true, pSDB, false);
QUERY_CHECK_CODE(code, lino, _end);
}
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
code = calBlockTbName(pInfo, pSDB, 0);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes->info.rows > 0) {
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update",
GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCreateTbRes;
pInfo->pRangeScanRes = pSDB;
return code;
}
(*ppRes) = pSDB;
return code;
}
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
default:
break;
}
if (hasScanRange(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
pInfo->updateResIndex = 0;
SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
code = copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
QUERY_CHECK_CODE(code, lino, _end);
blockDataCleanup(pSup->pScanBlock);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
printSpecDataBlock(pInfo->pUpdateRes, getStreamOpName(pOperator->operatorType), "rebuild", GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pUpdateRes;
return code;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK:
while (1) {
if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
if (pInfo->validBlockIndex >= totalBlocks) {
pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver,
NULL) < 0) {
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current,
totalBlocks, id);
continue;
}
}
blockDataCleanup(pInfo->pRes);
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
SSDataBlock* pRes = NULL;
code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
qDebug("retrieve data failed, try next block in submit block, %s", id);
continue;
}
code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
pInfo->pRes->info.rows = 0;
code = TSDB_CODE_SUCCESS;
} else {
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->pRes->info.rows == 0) {
continue;
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s",
pInfo->pCreateTbRes->info.rows, id);
(*ppRes) = pInfo->pCreateTbRes;
return code;
}
code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
QUERY_CHECK_CODE(code, lino, _end);
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
QUERY_CHECK_CODE(code, lino, _end);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
break;
}
}
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
break;
} else {
continue; continue;
} }
QUERY_CHECK_CONDITION(pTableInfo->numOfSrcTbls <= numPagePerTable, code, lino, _end, terrno);
SStreamVtableMergeHandle* pMergeHandle = NULL;
code = streamVtableMergeCreateHandle(&pMergeHandle, pTableInfo->numOfSrcTbls, numPagePerTable,
pInfo->pVtableMergeBuf, pInfo->pRes, id);
QUERY_CHECK_CODE(code, lino, _end);
code = taosHashPut(pInfo->pVtableMergeHandles, &pTableInfo->uid, sizeof(pTableInfo->uid), &pMergeHandle,
POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
streamVtableMergeDestroyHandle(&pMergeHandle);
}
QUERY_CHECK_CODE(code, lino, _end);
}
}
while (pOperator->status != OP_RES_TO_RETURN) {
SSDataBlock* pBlock = NULL;
SOperatorInfo* downStream = pOperator->pDownstream[0];
code = downStream->fpSet.getNextFn(downStream, &pBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
break;
} }
// record the scan action. if (pBlock->info.type == STREAM_NORMAL) {
pInfo->numOfExec++; SStreamVtableMergeHandle** ppHandle =
pOperator->resultInfo.totalRows += pBlockInfo->rows; taosHashGet(pInfo->pVtableMergeHandles, &pBlock->info.id.uid, sizeof(int64_t));
if (ppHandle == NULL) {
// skip table that is not needed
continue;
}
qDebug("stream scan completed, and return source rows:%" PRId64 ", %s", pBlockInfo->rows, id); code = streamVtableMergeAddBlock(*ppHandle, pBlock, id);
if (pBlockInfo->rows > 0) { QUERY_CHECK_CODE(code, lino, _end);
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); } else if (pBlock->info.type == STREAM_CHECKPOINT) {
(*ppRes) = pInfo->pRes;
return code;
}
if (pInfo->pUpdateDataRes->info.rows > 0) {
goto FETCH_NEXT_BLOCK;
}
goto NEXT_SUBMIT_BLK;
} else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
QUERY_CHECK_NULL(pData, code, lino, _end, terrno);
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno);
if (pBlock->info.type == STREAM_CHECKPOINT) {
// todo(kjq): serialize checkpoint // todo(kjq): serialize checkpoint
} else {
qError("unexpected block type %d, id:%s", pBlock->info.type, id);
code = TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
} }
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo)); }
(*ppRes) = pInfo->pCheckpointRes;
return code; if (taosArrayGetSize(pInfo->pVtableReadyHandles) == 0) {
} else { void* pIter = taosHashIterate(pInfo->pVtableMergeHandles, NULL);
qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id); while (pIter != NULL) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; SStreamVtableMergeHandle* pHandle = *(SStreamVtableMergeHandle**)pIter;
SVM_NEXT_RESULT res = SVM_NEXT_NOT_READY;
code = streamVtableMergeMoveNext(pHandle, &res, id);
QUERY_CHECK_CODE(code, lino, _end);
if (res == SVM_NEXT_FOUND) {
void* px = taosArrayPush(pInfo->pVtableReadyHandles, &pHandle);
QUERY_CHECK_NULL(px, code, lino, _end, terrno);
}
pIter = taosHashIterate(pInfo->pVtableMergeHandles, pIter);
}
}
blockDataCleanup(pResBlock);
while (true) {
void* px = taosArrayGetLast(pInfo->pVtableReadyHandles);
if (px == NULL) {
break;
}
SStreamVtableMergeHandle* pHandle = *(SStreamVtableMergeHandle**)px;
QUERY_CHECK_NULL(pHandle, code, lino, _end, terrno);
SVM_NEXT_RESULT res = SVM_NEXT_FOUND;
int32_t nCols = taosArrayGetSize(pResBlock->pDataBlock);
while (res == SVM_NEXT_FOUND) {
SSDataBlock* pBlock = NULL;
int32_t idx = 0;
code = streamVtableMergeCurrent(pHandle, &pBlock, &idx, id);
QUERY_CHECK_CODE(code, lino, _end);
bool newTuple = true;
if (pResBlock->info.rows > 0) {
SColumnInfoData* pResTsCol = taosArrayGet(pResBlock->pDataBlock, 0);
int64_t lastResTs = *(int64_t*)colDataGetNumData(pResTsCol, pResBlock->info.rows - 1);
SColumnInfoData* pMergeTsCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t mergeTs = *(int64_t*)colDataGetNumData(pMergeTsCol, idx);
QUERY_CHECK_CONDITION(mergeTs >= lastResTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
newTuple = (mergeTs > lastResTs);
}
if (newTuple) {
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
pResBlock->info.rows++;
for (int32_t i = 0; i < nCols; ++i) {
SColumnInfoData* pResCol = taosArrayGet(pResBlock->pDataBlock, i);
colDataSetNULL(pResCol, pResBlock->info.rows - 1);
}
}
for (int32_t i = 0; i < nCols; ++i) {
SColumnInfoData* pMergeCol = taosArrayGet(pBlock->pDataBlock, i);
if (!colDataIsNull_s(pMergeCol, idx)) {
SColumnInfoData* pResCol = taosArrayGet(pResBlock->pDataBlock, i);
code = colDataAssignNRows(pResCol, pResBlock->info.rows - 1, pMergeCol, idx, 1);
QUERY_CHECK_CODE(code, lino, _end);
}
}
code = streamVtableMergeMoveNext(pHandle, &res, id);
QUERY_CHECK_CODE(code, lino, _end);
}
if (res == SVM_NEXT_NOT_READY) {
px = taosArrayPop(pInfo->pVtableReadyHandles);
QUERY_CHECK_NULL(px, code, lino, _end, terrno);
}
if (pResBlock->info.rows > 0) {
break;
}
}
if (taosArrayGetSize(pInfo->pVtableReadyHandles) == 0) {
pOperator->status = OP_EXEC_DONE;
}
pInfo->numOfExec++;
if (pResBlock->info.rows > 0) {
(*ppRes) = pResBlock;
pOperator->resultInfo.totalRows += pResBlock->info.rows;
} }
_end: _end:
if (pIter != NULL) {
taosHashCancelIterate(pInfo->pVtableMergeHandles, pIter);
pIter = NULL;
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code; pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
(*ppRes) = NULL;
return code; return code;
} }
int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPhysiNode* pVirtualScanNode, int32_t createStreamVtableMergeOperatorInfo(SOperatorInfo* pDownstream, SReadHandle* pHandle,
SNode* pTagCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { SVirtualScanPhysiNode* pVirtualScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo); QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1272,9 +1043,18 @@ int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPh
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
code = blockDataEnsureCapacity(pInfo->pRes, TMAX(pOperator->resultInfo.capacity, 4096));
QUERY_CHECK_CODE(code, lino, _error);
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
int32_t pageSize = getProperSortPageSize(pInfo->pRes->info.rowSize, taosArrayGetSize(pInfo->pRes->pDataBlock));
code = createDiskbasedBuf(&pInfo->pVtableMergeBuf, pageSize, tsStreamVirtualMergeMaxMemKb * 1024,
"streamVtableMergeBuf", tsTempDir);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pVtableReadyHandles = taosArrayInit(0, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pVtableReadyHandles, code, lino, _error, terrno);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0; pInfo->groupId = 0;
@ -1337,6 +1117,9 @@ int32_t createStreamVtableMergeOperatorInfo(SReadHandle* pHandle, SVirtualScanPh
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
// TODO(kjq): save and load fill history state // TODO(kjq): save and load fill history state
code = appendDownstream(pOperator, &pDownstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return code;

View File

@ -110,6 +110,7 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;
pDataBlock->info.id.uid = be64toh(pRetrieve->useconds);
} }
pData->blocks = pArray; pData->blocks = pArray;

View File

@ -599,21 +599,43 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int
SStreamDispatchReq* pReq = pTask->msgInfo.pData; SStreamDispatchReq* pReq = pTask->msgInfo.pData;
int32_t msgId = pTask->msgInfo.msgId; int32_t msgId = pTask->msgInfo.msgId;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
setResendInfo(pEntry, now); setResendInfo(pEntry, now);
for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (pVgInfo->vgId == pEntry->nodeId) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", int32_t numOfVgroups = taosArrayGetSize(vgInfo);
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code));
break; for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo == NULL) {
continue;
}
if (pVgInfo->vgId == pEntry->nodeId) {
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId,
tstrerror(code));
break;
}
}
} else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
SArray *pTaskInfos = pTask->outputInfo.vtableMapDispatcher.taskInfos;
int32_t numOfTasks = taosArrayGetSize(pTaskInfos);
for (int32_t j = 0; j < numOfTasks; ++j) {
STaskDispatcherFixed *pAddr = taosArrayGet(pTaskInfos, j);
if (pAddr == NULL) {
continue;
}
if (pAddr->nodeId == pEntry->nodeId) {
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pAddr->nodeId, &pAddr->epSet);
stDebug("s-task:%s (child taskId:%d) vtable-map-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pAddr->nodeId, pMsg, msgId,
tstrerror(code));
break;
}
} }
} }
} }
@ -636,9 +658,10 @@ static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) {
int32_t msgId = pMsgInfo->msgId; int32_t msgId = pMsgInfo->msgId;
SStreamDispatchReq* pReq = pTask->msgInfo.pData; SStreamDispatchReq* pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, const char *taskType = (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) ? "shuffle" : "vtable-map";
msgId); stDebug("s-task:%s (child taskId:%d) retry %s-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId,
taskType, msgId);
int32_t numOfRetry = 0; int32_t numOfRetry = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
@ -672,8 +695,8 @@ static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) {
} }
} }
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry, stDebug("s-task:%s complete retry %s-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, taskType,
msgId); numOfRetry, msgId);
} else { } else {
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
@ -1409,7 +1432,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
} }
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0; pRetrieve->useconds = htobe64(pBlock->info.id.uid);
pRetrieve->precision = TSDB_DEFAULT_PRECISION; pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
@ -1873,7 +1896,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
streamMutexUnlock(&pMsgInfo->lock); streamMutexUnlock(&pMsgInfo->lock);
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
if (!allRsp) { if (!allRsp) {
stDebug( stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, " "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, "