fix(stream): take ver snapshot for all vgroups before launching stream with fill-history option opened.

This commit is contained in:
Haojun Liao 2024-02-01 11:37:57 +08:00
parent 2716119c92
commit cb2ea4a721
32 changed files with 454 additions and 277 deletions

View File

@ -2408,6 +2408,11 @@ typedef struct SColLocation {
int8_t type;
} SColLocation;
typedef struct SVgroupVer {
int32_t vgId;
int64_t ver;
} SVgroupVer;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN];
@ -2431,6 +2436,7 @@ typedef struct {
int64_t deleteMark;
int8_t igUpdate;
int64_t lastTs;
SArray* pVgroupVerList;
} SCMCreateStreamReq;
typedef struct {

View File

@ -18,7 +18,7 @@
// message process
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored);
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -126,6 +126,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TAGS,
FUNCTION_TYPE_TBUID,
FUNCTION_TYPE_VGID,
FUNCTION_TYPE_VGVER,
// internal function
FUNCTION_TYPE_SELECT_VALUE = 3750,

View File

@ -106,7 +106,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
const struct SMetaData* pMetaData, SQuery* pQuery);
int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData,
SQuery* pQuery);
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow);
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock* pBlock);
void qDestroyParseContext(SParseContext* pCxt);

View File

@ -96,9 +96,7 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qPseudoTagFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
/* Aggregation functions */
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -531,7 +531,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);

View File

@ -880,19 +880,21 @@ static bool incompletaFileParsing(SNode* pStmt) {
return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
}
void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) {
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
int64_t analyseStart = taosGetTimestampUs();
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row);
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
}
if (TSDB_CODE_SUCCESS == code) {
code = qContinuePlanPostQuery(pRequest->pPostPlan);
}
nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
handleQueryAnslyseRes(pWrapper, NULL, code);
}
@ -916,6 +918,43 @@ void returnToUser(SRequestObj* pRequest) {
}
}
static SSDataBlock* createResultBlock(TAOS_RES* pRes, int32_t numOfRows) {
int64_t lastTs = 0;
TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
SSDataBlock* pBlock = createDataBlock();
for(int32_t i = 0; i < numOfFields; ++i) {
SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
blockDataAppendColInfo(pBlock, &colInfoData);
}
blockDataEnsureCapacity(pBlock, numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) {
TAOS_ROW pRow = taos_fetch_row(pRes);
int64_t ts = *(int64_t*)pRow[0];
if (lastTs < ts) {
lastTs = ts;
}
for(int32_t j = 0; j < numOfFields; ++j) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
colDataSetVal(pColInfoData, i, pRow[j], false);
}
tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
}
pBlock->info.window.ekey = lastTs;
pBlock->info.rows = numOfRows;
tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows);
return pBlock;
}
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
SRequestObj* pRequest = (SRequestObj*)res;
if (pRequest->code) {
@ -923,14 +962,10 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
return;
}
TAOS_ROW row = NULL;
if (rowNum > 0) {
row = taos_fetch_row(res); // for single row only now
}
SSDataBlock* pBlock = createResultBlock(res, rowNum);
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
if (pNextReq) {
continuePostSubQuery(pNextReq, row);
continuePostSubQuery(pNextReq, pBlock);
releaseRequest(pRequest->relation.nextRefId);
} else {
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,

View File

@ -829,7 +829,10 @@ TEST(clientCase, projection_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)");
// TAOS_RES* pRes = taos_query(pConn, "select tbname, last(ts) from abc1.stable_1 group by tbname");
// taos_free_result(pRes);
pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}

View File

@ -7217,6 +7217,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->fillNullCols); ++i) {
SColLocation *pCol = taosArrayGet(pReq->fillNullCols, i);
@ -7224,10 +7225,19 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI16(&encoder, pCol->colId) < 0) return -1;
if (tEncodeI8(&encoder, pCol->type) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1;
for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i);
if (tEncodeI32(&encoder, p->vgId) < 0) return -1;
if (tEncodeI64(&encoder, p->ver) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -7238,6 +7248,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
int32_t numOfFillNullCols = 0;
int32_t numOfVgVer = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -7289,7 +7301,6 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
int32_t numOfFillNullCols = 0;
if (tDecodeI32(&decoder, &numOfFillNullCols) < 0) return -1;
if (numOfFillNullCols > 0) {
pReq->fillNullCols = taosArrayInit(numOfFillNullCols, sizeof(SColLocation));
@ -7314,9 +7325,28 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
tEndDecode(&decoder);
if (tDecodeI32(&decoder, &numOfVgVer) < 0) return -1;
if (numOfVgVer > 0) {
pReq->pVgroupVerList = taosArrayInit(numOfVgVer, sizeof(SVgroupVer));
if (pReq->pVgroupVerList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfVgVer; ++i) {
SVgroupVer v = {0};
if (tDecodeI32(&decoder, &v.vgId) < 0) return -1;
if (tDecodeI64(&decoder, &v.ver) < 0) return -1;
if (taosArrayPush(pReq->pVgroupVerList, &v) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}

View File

@ -27,7 +27,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVerList);
#ifdef __cplusplus
}

View File

@ -235,35 +235,71 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr
int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
SEpSet* pEpset, bool isFillhistory) {
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory);
SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, pTaskList, pStream->conf.fillHistory);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
pTask->info.nodeId = vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndSetSinkTaskInfo(pStream, pTask);
return 0;
}
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SVgroupVer* pVer = taosArrayGet(pList, i);
if (pVer->vgId == vgId) {
return pVer->ver;
}
}
mError("failed to find the vgId:%d for extract last version", vgId);
return -1;
}
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
int64_t latestVer = getVgroupLastVer(pVerList, vgId);
if (latestVer < 0) {
latestVer = 0;
}
// set the correct ts, which is the last key of queried table.
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
if (pTask->info.fillHistory) {
pWindow->skey = INT64_MIN;
pWindow->ekey = skey - 1;
pRange->range.minVer = 0;
pRange->range.maxVer = latestVer;
mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
} else {
pWindow->skey = skey;
pWindow->ekey = INT64_MAX;
pRange->range.minVer = latestVer + 1;
pRange->range.maxVer = INT64_MAX;
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
}
}
static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory,
bool hasExtraSink, int64_t nextWindowSkey, bool hasFillHistory) {
SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory);
SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, int64_t skey,
SArray* pVerList, bool fillHistory, bool hasExtraSink, bool hasFillHistory) {
int64_t t = pStream->conf.triggerParam;
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, fillHistory, t, pTaskList, hasFillHistory);
if (pTask == NULL) {
return terrno;
}
epsetAssign(&pTask->info.mnodeEpset, pEpset);
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
pWindow->ekey = nextWindowSkey - 1;
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey);
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
// sink or dispatch
if (hasExtraSink) {
@ -308,7 +344,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
}
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) {
SEpSet* pEpset, bool hasExtraSink, int64_t skey, SArray* pVerList) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
@ -345,8 +381,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset,
false, hasExtraSink, nextWindowSkey, pStream->conf.fillHistory);
int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, skey,
pVerList, false, hasExtraSink, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
@ -354,8 +390,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pEpset, true, hasExtraSink, nextWindowSkey, true);
code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pEpset, skey,
pVerList, true, hasExtraSink, true);
}
sdbRelease(pSdb, pVgroup);
@ -371,24 +407,17 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
return TSDB_CODE_SUCCESS;
}
static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t uid, SStreamTask* pDownstreamTask,
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset,
int64_t nextWindowSkey, bool hasFillHistory) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, isFillhistory, 0, pTaskList, hasFillHistory);
static int32_t addSourceTaskForMultiLevelStream(SArray* pTaskList, bool isFillhistory, int64_t uid,
SStreamTask* pDownstreamTask, SMnode* pMnode, SSubplan* pPlan,
SVgObj* pVgroup, SEpSet* pEpset, int64_t skey, SArray* pVerList,
bool hasFillHistory) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, 0, pTaskList, hasFillHistory);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
epsetAssign(&(pTask)->info.mnodeEpset, pEpset);
// set the correct ts, which is the last key of queried table.
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
pWindow->ekey = nextWindowSkey - 1;
mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel,
pWindow->skey, pWindow->ekey);
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
// all the source tasks dispatch result to a single agg node.
streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask);
@ -401,14 +430,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t ui
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory);
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory);
if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset);
// dispatch
if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
return -1;
@ -492,7 +519,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream,
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask,
SEpSet* pEpset, int64_t nextWindowSkey) {
SEpSet* pEpset, int64_t skey, SArray* pVerList) {
SArray* pSourceTaskList = addNewTaskList(pStream->tasks);
SArray* pHSourceTaskList = NULL;
@ -521,8 +548,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
continue;
}
int32_t code = doAddSourceTask(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset,
nextWindowSkey, pStream->conf.fillHistory);
int32_t code = addSourceTaskForMultiLevelStream(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset,
skey, pVerList, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
@ -530,8 +557,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset,
nextWindowSkey, pStream->conf.fillHistory);
code = addSourceTaskForMultiLevelStream(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset,
skey, pVerList, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return code;
@ -580,7 +607,8 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
SArray* pVerList) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
@ -637,15 +665,15 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
}
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey);
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, skey, pVerList);
} else if (numOfPlanLevel == 1) {
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey);
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, skey, pVerList);
}
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) {
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
@ -655,7 +683,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindo
SEpSet mnodeEpset = {0};
mndGetMnodeEpSet(pMnode, &mnodeEpset);
int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey, &mnodeEpset);
int32_t code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
qDestroyQueryPlan(pPlan);
return code;

View File

@ -638,7 +638,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL) != 0) goto _OVER;
if (mndPersistStream(pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;

View File

@ -649,8 +649,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
return -1;
}
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
SCMCreateStreamReq createReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
@ -659,17 +659,17 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("stream:%s, start to create, sql:%s", createStreamReq.name, createStreamReq.sql);
mInfo("stream:%s, start to create, sql:%s", createReq.name, createReq.sql);
if (mndCheckCreateStreamReq(&createStreamReq) != 0) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
if (mndCheckCreateStreamReq(&createReq) != 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
goto _OVER;
}
pStream = mndAcquireStream(pMnode, createStreamReq.name);
pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) {
if (createStreamReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
if (createReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
goto _OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
@ -679,16 +679,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
if (createStreamReq.sql != NULL) {
sqlLen = strlen(createStreamReq.sql);
if (createReq.sql != NULL) {
sqlLen = strlen(createReq.sql);
sql = taosMemoryMalloc(sqlLen + 1);
memset(sql, 0, sqlLen + 1);
memcpy(sql, createStreamReq.sql, sqlLen);
memcpy(sql, createReq.sql, sqlLen);
}
// build stream obj from request
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq) < 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
goto _OVER;
}
@ -702,23 +702,23 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// create stb for stream
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE &&
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
if (mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList) < 0) {
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
if (mndPersistStream(pTrans, &streamObj) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
@ -749,10 +749,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
SName dbname = {0};
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
// reuse this function for stream
if (sql != NULL && sqlLen > 0) {
@ -765,11 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
_OVER:
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
}
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeSCMCreateStreamReq(&createReq);
tFreeStreamObj(&streamObj);
if (sql != NULL) {
taosMemoryFreeClear(sql);

View File

@ -738,10 +738,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
@ -846,17 +845,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
if (pTask->info.fillHistory) {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64
" ms, inputVer:%" PRId64,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer);
} else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64
" ms, inputVer:%" PRId64,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
}
return 0;
@ -878,10 +879,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
SVersionRange* pRange = &pTask->dataRange.range;
// if it's an source task, extract the last version in wal.
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();

View File

@ -49,7 +49,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
return 0;
}
int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
@ -547,7 +547,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
streamMetaWUnLock(pMeta);
if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
tstrerror(code));
tFreeStreamTask(pTask);
return code;
@ -562,9 +562,10 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
if (restored) {
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && (p->info.fillHistory == 0)) {
tqStreamOneTaskStartAsync(pMeta, cb, streamId, taskId);
if ((p != NULL) && (p->info.fillHistory == 0)) {
tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
}
if (p != NULL) {
streamMetaReleaseTask(pMeta, p);
}

View File

@ -2294,6 +2294,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1;
pResBlock->info.version = pReader->info.verRange.maxVer;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true);
@ -2799,6 +2800,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pInfo->rows = pBlockInfo->numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->version = pReader->info.verRange.maxVer;
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);

View File

@ -279,7 +279,6 @@ typedef struct STableScanInfo {
int8_t assignBlockUid;
uint8_t countState; // empty table count state
bool hasGroupByTag;
bool countOnly;
bool filesetDelimited;
bool needCountEmptyTable;
} STableScanInfo;
@ -757,10 +756,12 @@ extern void doDestroyExchangeOperatorInfo(void* param);
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId);
void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);

View File

@ -250,7 +250,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
pRes->info.rows, pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
@ -313,7 +313,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (taosArrayGetSize(pInfo->pUidList) > 0) {
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
pInfo->pRes->info.rows, pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;

View File

@ -287,7 +287,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
terrno = TSDB_CODE_SUCCESS;
int32_t num = 0;
uint64_t groupId = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) {
@ -478,20 +477,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
#if 0
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
// initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);

View File

@ -21,7 +21,6 @@
#include "querynodes.h"
#include "systable.h"
#include "tname.h"
#include "ttime.h"
#include "tdatablock.h"
#include "tmsg.h"
@ -252,7 +251,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
pTaskInfo, &pTableScanInfo->metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
T_LONG_JMP(pTaskInfo->env, code);
@ -482,24 +481,25 @@ static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t
}
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache) {
int32_t code = 0;
bool freeReader = false;
LRUHandle* h = NULL;
STableCachedVal val = {0};
SMetaReader mr = {0};
const char* idStr = pTask->id.str;
// currently only the tbname pseudo column
if (numOfExpr <= 0) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
bool freeReader = false;
// todo: opt if only require the vgId and the vgVer;
// backup the rows
int32_t backupRows = pBlock->info.rows;
pBlock->info.rows = rows;
STableCachedVal val = {0};
SMetaReader mr = {0};
LRUHandle* h = NULL;
// todo refactor: extract method
// the handling of the null data should be packed in the extracted method
@ -586,7 +586,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
// this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) {
int32_t fType = pExpr1->pExpr->_function.functionType;
if (fType == FUNCTION_TYPE_TBNAME) {
setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
} else if (fType == FUNCTION_TYPE_VGID) {
setVgIdColData(pBlock, pColInfoData, functionId, pTask->id.vgId);
} else if (fType == FUNCTION_TYPE_VGVER) {
setVgVerColData(pBlock, pColInfoData, functionId, pBlock->info.version);
}
} else { // these are tags
STagVal tagVal = {0};
tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
@ -655,6 +662,47 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
colDataDestroy(&infoData);
}
void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = createColumnInfoData(pColInfoData->info.type, pColInfoData->info.bytes, 1);
colInfoDataEnsureCapacity(&infoData, 1, false);
colDataSetVal(&infoData, 0, (const char*)&vgId, false);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
if (fpSet.process != NULL) {
fpSet.process(&srcParam, 1, &param);
} else {
qError("failed to get the corresponding callback function, functionId:%d", functionId);
}
colDataDestroy(&infoData);
}
void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = createColumnInfoData(pColInfoData->info.type, pColInfoData->info.bytes, 1);
colInfoDataEnsureCapacity(&infoData, 1, false);
colDataSetVal(&infoData, 0, (const char*)&vgVer, false);
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
if (fpSet.process != NULL) {
fpSet.process(&srcParam, 1, &param);
} else {
qError("failed to get the corresponding callback function, functionId:%d", functionId);
}
colDataDestroy(&infoData);
}
static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) {
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
@ -963,13 +1011,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId == -1) {
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
initNextGroupScan(pInfo, &pList, &num);
ASSERT(pInfo->base.dataReader == NULL);
@ -1172,10 +1218,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
if (scanDebug) {
pInfo->countOnly = true;
}
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
@ -1901,7 +1943,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pBlockInfo->rows, id, &pTableScanInfo->base.metaCache);
pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock);
@ -2134,7 +2176,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
qDebug("stream scan step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
pStreamInfo->recoverScanFinished = false;

View File

@ -707,18 +707,20 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
static int32_t translateTbUidColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}
static int32_t translateVgIdColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT};
return TSDB_CODE_SUCCESS;
}
static int32_t translateVgVerColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
@ -3453,7 +3455,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateTbnameColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = qTbnameFunction,
.sprocessFunc = qPseudoTagFunction,
.finalizeFunc = NULL
},
{
@ -3740,11 +3742,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateTbUidColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
#ifdef BUILD_NO_CALL
.sprocessFunc = qTbUidFunction,
#else
.sprocessFunc = NULL,
#endif
.sprocessFunc = qPseudoTagFunction,
.finalizeFunc = NULL
},
{
@ -3754,11 +3752,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateVgIdColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
#ifdef BUILD_NO_CALL
.sprocessFunc = qVgIdFunction,
#else
.sprocessFunc = NULL,
#endif
.sprocessFunc = qPseudoTagFunction,
.finalizeFunc = NULL
},
{
@ -3781,6 +3775,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = toCharFunction,
.finalizeFunc = NULL
},
{
.name = "_vgver",
.type = FUNCTION_TYPE_VGVER,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateVgVerColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = qPseudoTagFunction,
.finalizeFunc = NULL
}
};
// clang-format on

View File

@ -31,7 +31,7 @@ typedef struct SNodeMemChunk {
struct SNodeMemChunk* pNext;
} SNodeMemChunk;
typedef struct SNodeAllocator {
struct SNodeAllocator {
int64_t self;
int64_t queryId;
int32_t chunkSize;
@ -39,7 +39,7 @@ typedef struct SNodeAllocator {
SNodeMemChunk* pCurrChunk;
SNodeMemChunk* pChunks;
TdThreadMutex mutex;
} SNodeAllocator;
};
static threadlocal SNodeAllocator* g_pNodeAllocator;
static int32_t g_allocatorReqRefPool = -1;

View File

@ -35,8 +35,8 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache*
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues);
int32_t translateTable(STranslateContext* pCxt, SNode** pTable);
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput);

View File

@ -14,6 +14,7 @@
*/
#include "parTranslater.h"
#include "tdatablock.h"
#include "parInt.h"
#include "catalog.h"
@ -6957,7 +6958,7 @@ int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval*
return TSDB_CODE_SUCCESS;
}
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) {
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
int64_t lastTs = 0;
@ -6967,11 +6968,13 @@ int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, vo
if (TSDB_CODE_SUCCESS == code) {
code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval);
}
if (TSDB_CODE_SUCCESS == code) {
if (pResRow && pResRow[0]) {
lastTs = *(int64_t*)pResRow[0];
if (pBlock != NULL && pBlock->info.rows >= 1) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
lastTs = *(int64_t*)colDataGetData(pColInfo, 0);
} else if (interval.interval > 0) {
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
lastTs = taosGetTimestamp(interval.precision);
} else {
lastTs = taosGetTimestampMs();
}
@ -8110,18 +8113,63 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
nodesDestroyList(pParamterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
code = nodesListStrictAppend(pProjectionList, pFunc);
if (code) {
nodesDestroyList(pProjectionList);
return code;
}
SFunctionNode* pFunc1 = createFunction("_vgid", NULL);
if (NULL == pFunc1) {
nodesDestroyList(pParamterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf(pFunc1->node.aliasName, sizeof(pFunc1->node.aliasName), "%s.%p", pFunc1->functionName, pFunc1);
code = nodesListStrictAppend(pProjectionList, (SNode*) pFunc1);
if (code) {
nodesDestroyList(pProjectionList);
return code;
}
SFunctionNode* pFunc2 = createFunction("_vgver", NULL);
if (NULL == pFunc2) {
nodesDestroyList(pParamterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf(pFunc2->node.aliasName, sizeof(pFunc2->node.aliasName), "%s.%p", pFunc2->functionName, pFunc2);
code = nodesListStrictAppend(pProjectionList, (SNode*) pFunc2);
if (code) {
nodesDestroyList(pProjectionList);
return code;
}
code = createSimpleSelectStmtFromProjList(pDb, pTable, pProjectionList, (SSelectStmt**)pQuery);
if (code) {
nodesDestroyList(pProjectionList);
return code;
}
// todo add the group by statement
SSelectStmt** pSelect1 = (SSelectStmt**)pQuery;
(*pSelect1)->pGroupByList = nodesMakeList();
SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
pNode1->groupingSetType = GP_TYPE_NORMAL;
pNode1->pParameterList = nodesMakeList();
nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1);
nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
pNode2->groupingSetType = GP_TYPE_NORMAL;
pNode2->pParameterList = nodesMakeList();
nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
return code;
}
@ -8269,7 +8317,32 @@ static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval*
return code;
}
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) {
// ts, vgroup_id, vgroup_version
static int32_t createStreamReqVersionInfo(SSDataBlock* pBlock, SArray** pArray, int64_t* lastTs, SInterval* pInterval) {
*pArray = taosArrayInit(pBlock->info.rows, sizeof(SVgroupVer));
if (*pArray == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pBlock->info.rows > 0) {
*lastTs = pBlock->info.window.ekey;
SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, 1);
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 2);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
SVgroupVer v = {.vgId = *(int32_t*)colDataGetData(pCol1, i), .ver = *(int64_t*)colDataGetData(pCol2, i)};
parserDebug("-------------%ld, vgId:%d, vgVer:%ld\n", *lastTs, v.vgId, v.ver);
taosArrayPush(*pArray, &v);
}
} else {
int32_t precision = (pInterval->interval > 0)? pInterval->precision:TSDB_TIME_PRECISION_MILLI;
*lastTs = taosGetTimestamp(precision);
}
return TSDB_CODE_SUCCESS;
}
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock) {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
STranslateContext cxt = {0};
SInterval interval = {0};
@ -8279,15 +8352,11 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void
if (TSDB_CODE_SUCCESS == code) {
code = buildIntervalForCreateStream(pStmt, &interval);
}
if (TSDB_CODE_SUCCESS == code) {
if (pResRow && pResRow[0]) {
lastTs = *(int64_t*)pResRow[0];
} else if (interval.interval > 0) {
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
} else {
lastTs = taosGetTimestampMs();
}
code = createStreamReqVersionInfo(pBlock, &pStmt->pReq->pVgroupVerList, &lastTs, &interval);
}
if (TSDB_CODE_SUCCESS == code) {
if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision);
@ -8296,9 +8365,11 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void
}
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery);
}
setRefreshMeta(&cxt, pQuery);
destroyTranslateContext(&cxt);

View File

@ -233,14 +233,17 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq,
return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData);
}
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow) {
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) {
case QUERY_NODE_CREATE_STREAM_STMT:
code = translatePostCreateStream(pCxt, pQuery, pResRow);
case QUERY_NODE_CREATE_STREAM_STMT: {
code = translatePostCreateStream(pCxt, pQuery, pBlock);
break;
case QUERY_NODE_CREATE_INDEX_STMT:
code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow);
}
case QUERY_NODE_CREATE_INDEX_STMT: {
code = translatePostCreateSmaIndex(pCxt, pQuery, pBlock);
break;
}
default:
break;
}

View File

@ -1815,9 +1815,8 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
return TSDB_CODE_SUCCESS;
}
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetVarData(pInput->columnData, 0);
int32_t qPseudoTagFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char *p = colDataGetData(pInput->columnData, 0);
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
if (code) {
return code;
@ -1826,31 +1825,6 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetNumData(pInput->columnData, 0);
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
if (code) {
return code;
}
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetNumData(pInput->columnData, 0);
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
if (code) {
return code;
}
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
#endif
/** Aggregation functions **/
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {

View File

@ -123,7 +123,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,

View File

@ -388,10 +388,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter",
pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer);
streamTaskResetTimewindowFilter(pStreamTask);
} else {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
@ -420,10 +417,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
ASSERT(pTask->status.appendTranstateBlock == 1);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask);
}
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.

View File

@ -590,11 +590,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return 0;
}
if (pTask->info.fillHistory == 1) {
stDebug("s-task:0x%x initial nextProcessVer is set to 1 for fill-history task", pTask->id.taskId);
ver = 1;
}
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
return -1;

View File

@ -595,8 +595,6 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
SDataRange* pRange = &pHTask->dataRange;
// the query version range should be limited to the already processed data
pRange->range.minVer = 0;
pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1;
pHTask->execInfo.init = taosGetTimestampMs();
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -843,7 +841,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
}
}
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec);
}
@ -854,8 +852,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
int64_t walScanStartVer = pRange->maxVer + 1;
if (walScanStartVer > nextProcessVer - 1) {
// no input data yet. no need to execute the secondary scan while stream task halt
streamTaskFillHistoryFinished(pTask);
stDebug(
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
"related stream task currentVer:%" PRId64,
@ -965,25 +961,12 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
return;
}
int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) {
ekey = pRange->window.ekey + 1;
} else {
ekey = pRange->window.ekey;
}
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64
stDebug("s-task:%s level:%d related fill-history task exists, stream task timeWindow:%" PRId64 " - %" PRId64
", verRang:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX);
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX};
SVersionRange verRange = pRange->range;
STimeWindow win = pRange->window;
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}

View File

@ -79,7 +79,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
return pEpInfo;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
@ -92,6 +92,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->ver = SSTREAM_TASK_VER;
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->info.triggerParam = triggerParam;
@ -115,6 +116,8 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
ASSERT(hasFillhistory);
}
epsetAssign(&(pTask->info.mnodeEpset), pEpset);
addToTaskset(pTaskList, pTask);
return pTask;
}
@ -458,16 +461,18 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
@ -476,29 +481,34 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
pTask->execInfo.created = taosGetTimestampMs();
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange;
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
pTask->chkInfo.processedVer = ver - 1; // already processed version
// only set the version info for stream tasks without fill-history task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
pChkInfo->processedVer = ver - 1; // already processed version
pChkInfo->nextProcessVer = ver; // next processed version
pTask->chkInfo.nextProcessVer = ver; // next processed version
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->outputInfo.pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
} else {
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
}
}
}
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pTask->pMeta = pMeta;
pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);
@ -514,10 +524,22 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
taosThreadMutexInit(&pTask->lock, &attr);
taosThreadMutexAttrDestroy(&attr);
streamTaskOpenAllUpstreamInput(pTask);
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pTask->outputInfo.pDownstreamUpdateList == NULL) {
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pOutputInfo->pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pDownstreamUpdateList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -527,7 +549,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0;
} else {
}
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
return 0;
@ -538,7 +561,6 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
return taosArrayGetSize(vgInfo);
}
}
}
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);

View File

@ -105,7 +105,7 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (pTask->hTaskInfo.haltVer == -1) {
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1;
pTask->hTaskInfo.haltVer = pTask->dataRange.range.minVer;
}
}