Merge pull request #14898 from taosdata/feature/stream
refactor(stream): destroy stream task
This commit is contained in:
commit
b33d6584c4
|
@ -343,7 +343,7 @@ tmq_list_t* tmq_list_new() {
|
|||
|
||||
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
||||
SArray* container = &list->container;
|
||||
char* topic = strDupUnquo(src);
|
||||
char* topic = strdup(src);
|
||||
if (taosArrayPush(container, &topic) == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ typedef struct SMnode {
|
|||
} SMnode;
|
||||
|
||||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
||||
int64_t mndGenerateUid(char *name, int32_t len);
|
||||
int64_t mndGenerateUid(const char *name, int32_t len);
|
||||
|
||||
int32_t mndAcquireRpcRef(SMnode *pMnode);
|
||||
void mndReleaseRpcRef(SMnode *pMnode);
|
||||
|
|
|
@ -624,7 +624,7 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
|
|||
}
|
||||
|
||||
// Note: uid 0 is reserved
|
||||
int64_t mndGenerateUid(char *name, int32_t len) {
|
||||
int64_t mndGenerateUid(const char *name, int32_t len) {
|
||||
int32_t hashval = MurmurHash3_32(name, len);
|
||||
do {
|
||||
int64_t us = taosGetTimestampUs();
|
||||
|
|
|
@ -479,7 +479,8 @@ static void mndDestroySmaObj(SSmaObj *pSmaObj) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
|
||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb,
|
||||
const char *streamName) {
|
||||
SSmaObj smaObj = {0};
|
||||
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -520,12 +521,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
}
|
||||
|
||||
SStreamObj streamObj = {0};
|
||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.name, streamName, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
|
||||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.uid = mndGenerateUid(streamName, strlen(streamName));
|
||||
streamObj.sourceDbUid = pDb->uid;
|
||||
streamObj.targetDbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
|
@ -590,7 +591,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
mndTransSetSerial(pTrans);
|
||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||
mDebug("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
|
||||
|
||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
|
@ -638,6 +639,14 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) {
|
||||
SName n;
|
||||
tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
streamName[0] = '1';
|
||||
streamName[1] = '.';
|
||||
strcpy(streamName + 2, tNameGetTableName(&n));
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
|
@ -663,9 +672,12 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pStream = mndAcquireStream(pMnode, createReq.name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, createReq.name);
|
||||
|
||||
pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream != NULL) {
|
||||
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
|
||||
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
|
||||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -692,7 +704,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb);
|
||||
code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb, streamName);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
|
@ -789,7 +801,10 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream == NULL || pStream->smaId != pSma->uid) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
|
@ -838,7 +853,10 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
mndGetStreamNameFromSmaName(streamName, pSma->name);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
|
||||
if (pStream != NULL && pStream->smaId == pSma->uid) {
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||
|
|
|
@ -235,7 +235,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
|
|||
}
|
||||
|
||||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||
SNode *pAst = NULL;
|
||||
SNode *pAst = NULL;
|
||||
SQueryPlan *pPlan = NULL;
|
||||
|
||||
mDebug("stream:%s to create", pCreate->name);
|
||||
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
|
@ -293,7 +294,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
SQueryPlan *pPlan = NULL;
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
|
@ -317,6 +317,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
FAIL:
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
if (pPlan != NULL) qDestroyQueryPlan(pPlan);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -541,7 +542,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// build stream obj from request
|
||||
SStreamObj streamObj = {0};
|
||||
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
|
||||
ASSERT(0);
|
||||
/*ASSERT(0);*/
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -689,7 +690,14 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
return -1;
|
||||
} else {
|
||||
// TODO drop all task on snode
|
||||
#if 0
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
|
|
@ -214,7 +214,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
}
|
||||
pRSmaStat->refId = refId;
|
||||
|
||||
|
||||
// init hash
|
||||
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
@ -256,6 +255,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
|
||||
// step 2: destroy the rsma info and associated fetch tasks
|
||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||
#if 1
|
||||
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
|
||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
|
||||
while (infoHash) {
|
||||
|
@ -264,6 +264,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||
|
||||
// step 3: wait all triggered fetch tasks finished
|
||||
|
@ -382,4 +383,4 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
|||
tdUnLockSma(pSma);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
};
|
||||
};
|
||||
|
|
|
@ -30,7 +30,7 @@ typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
|||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
SReadHandle *handle, int8_t idx);
|
||||
int8_t idx);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
||||
STSchema *pTSchema, tb_uid_t suid, int8_t level);
|
||||
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
|
@ -256,14 +256,20 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
|||
}
|
||||
|
||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
SReadHandle *pReadHandle, int8_t idx) {
|
||||
int8_t idx) {
|
||||
SRetention *pRetention = SMA_RETENTION(pSma);
|
||||
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
|
||||
|
||||
SReadHandle handle = {
|
||||
.meta = pSma->pVnode->pMeta,
|
||||
.vnode = pSma->pVnode,
|
||||
.initTqReader = 1,
|
||||
};
|
||||
|
||||
if (param->qmsg[idx]) {
|
||||
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
||||
pItem->refId = RSMA_REF_ID(pStat);
|
||||
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
|
||||
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
||||
if (!pItem->taskInfo) {
|
||||
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||
goto _err;
|
||||
|
@ -299,10 +305,6 @@ _err:
|
|||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||
|
||||
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
|
||||
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -331,19 +333,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STqReader *pReader = tqOpenReader(pVnode);
|
||||
if (!pReader) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SReadHandle handle = {
|
||||
.tqReader = pReader,
|
||||
.meta = pMeta,
|
||||
.pMsgCb = pMsgCb,
|
||||
.vnode = pVnode,
|
||||
};
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
|
||||
if (!pTSchema) {
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
|
@ -352,11 +341,11 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
pRSmaInfo->pTSchema = pTSchema;
|
||||
pRSmaInfo->suid = suid;
|
||||
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 0) < 0) {
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 1) < 0) {
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -369,7 +358,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo);
|
||||
taosMemoryFree(pReader);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -404,7 +392,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
|
|||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
if (!VND_IS_RSMA(pVnode)) {
|
||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||
|
@ -412,11 +400,9 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief store suid/[uids], prefer to use array and then hash
|
||||
|
|
|
@ -82,6 +82,13 @@ void tqClose(STQ* pTq) {
|
|||
if (pTq) {
|
||||
tqOffsetClose(pTq->pOffsetStore);
|
||||
taosHashCleanup(pTq->handles);
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
tFreeSStreamTask(pTask);
|
||||
}
|
||||
taosHashCleanup(pTq->pStreamTasks);
|
||||
taosHashCleanup(pTq->pushMgr);
|
||||
taosMemoryFree(pTq->path);
|
||||
|
@ -608,7 +615,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
tqInfo("deploy stream task id %d child id %d on vgId:%d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode));
|
||||
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||
pTask->selfChildId);
|
||||
|
||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
|
||||
|
@ -634,9 +642,6 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||
continue;
|
||||
}
|
||||
if (!pTask->isDataScan) continue;
|
||||
|
||||
if (!failed) {
|
||||
|
@ -665,11 +670,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
@ -682,55 +688,62 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
if (pTask) {
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
if (pTask) {
|
||||
streamProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
streamProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
if (pTask) {
|
||||
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
}
|
||||
// todo
|
||||
// clear queue
|
||||
// push drop req into queue
|
||||
|
|
|
@ -98,8 +98,21 @@ STqReader* tqOpenReader(SVnode* pVnode) {
|
|||
|
||||
void tqCloseReader(STqReader* pReader) {
|
||||
// close wal reader
|
||||
if (pReader->pWalReader) {
|
||||
walCloseReader(pReader->pWalReader);
|
||||
}
|
||||
// free cached schema
|
||||
if (pReader->pSchema) {
|
||||
taosMemoryFree(pReader->pSchema);
|
||||
}
|
||||
if (pReader->pSchemaWrapper) {
|
||||
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
|
||||
}
|
||||
if (pReader->pColIdList) {
|
||||
taosArrayDestroy(pReader->pColIdList);
|
||||
}
|
||||
// free hash
|
||||
taosHashCleanup(pReader->tbIdHash);
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
const STColumn* pColumn = &pTSchema->columns[k];
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
||||
} else {
|
||||
void* data = colDataGetData(pColData, j);
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||
|
|
|
@ -147,7 +147,6 @@ typedef struct {
|
|||
SSDataBlock* pullOverBlk; // for streaming
|
||||
SWalFilterCond cond;
|
||||
int64_t lastScanUid;
|
||||
SStreamQueue* inputQueue;
|
||||
} SStreamTaskInfo;
|
||||
|
||||
typedef struct SExecTaskInfo {
|
||||
|
|
|
@ -44,13 +44,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (model == OPTR_EXEC_MODEL_STREAM) {
|
||||
(*pTask)->streamInfo.inputQueue = streamQueueOpen();
|
||||
if ((*pTask)->streamInfo.inputQueue == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
|
||||
code = dsDataSinkMgtInit(&cfg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -259,12 +252,14 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner) {
|
||||
SStreamScanInfo* pInfo = scanner;
|
||||
|
|
|
@ -39,7 +39,7 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
|
|||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
const char* dbName);
|
||||
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
|
||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||
#if 0
|
||||
|
@ -874,22 +874,22 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
|
|||
return true;
|
||||
}
|
||||
|
||||
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) {
|
||||
SResultRowInfo dumyInfo;
|
||||
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo,
|
||||
int32_t* pRowIndex) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval,
|
||||
TSDB_ORDER_ASC);
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
|
||||
STimeWindow endWin = win;
|
||||
STimeWindow preWin = win;
|
||||
while (1) {
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
|
||||
binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
do {
|
||||
preWin = endWin;
|
||||
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
|
||||
} while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
|
||||
endWin = preWin;
|
||||
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) {
|
||||
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows) {
|
||||
win.ekey = endWin.ekey;
|
||||
return win;
|
||||
}
|
||||
|
@ -923,7 +923,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
|
|||
pInfo->updateWin.ekey = tsCols[*pRowIndex - 1];
|
||||
// win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
|
||||
// (*pRowIndex) +=
|
||||
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
|
||||
// TSDB_ORDER_ASC);
|
||||
}
|
||||
needRead = true;
|
||||
} else if (isStateWindow(pInfo)) {
|
||||
|
@ -1177,10 +1178,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
|
||||
SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
|
||||
if (pResCol->info.colId == pColMatchInfo->colId) {
|
||||
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
|
||||
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
||||
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
|
||||
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
|
||||
colExists = true;
|
||||
break;
|
||||
}
|
||||
|
@ -1200,14 +1200,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||
GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
blockDataFreeRes((SSDataBlock*) pBlock);
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
blockDataFreeRes((SSDataBlock*) pBlock);
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1441,6 +1441,29 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
|
||||
#if 1
|
||||
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
||||
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
||||
destroyTableScanOperatorInfo(pTableScanInfo, 1);
|
||||
}
|
||||
#endif
|
||||
if (pStreamScan->tqReader) {
|
||||
tqCloseReader(pStreamScan->tqReader);
|
||||
}
|
||||
if (pStreamScan->pColMatchInfo) {
|
||||
taosArrayDestroy(pStreamScan->pColMatchInfo);
|
||||
}
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
blockDataDestroy(pStreamScan->pRes);
|
||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||
blockDataDestroy(pStreamScan->pPullDataRes);
|
||||
blockDataDestroy(pStreamScan->pDeleteDataRes);
|
||||
taosArrayDestroy(pStreamScan->pBlockLists);
|
||||
taosMemoryFree(pStreamScan);
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
|
@ -1555,8 +1578,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
|
|
@ -1340,13 +1340,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
uint64_t groupId = *(uint64_t*)key;
|
||||
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||
STimeWindow win;
|
||||
win.skey = ts;
|
||||
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
SWinRes winRe = {
|
||||
.ts = win.skey,
|
||||
.groupId = groupId,
|
||||
SWinRes winRe = {
|
||||
.ts = win.skey,
|
||||
.groupId = groupId,
|
||||
};
|
||||
void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
|
||||
if (isCloseWindow(&win, pSup)) {
|
||||
|
@ -1537,7 +1537,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
for (int32_t i = 0; i < size; i++) {
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
||||
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||
taosMemoryFreeClear(pChildOp->info);
|
||||
taosMemoryFreeClear(pChildOp);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,63 +74,62 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
|||
}
|
||||
|
||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||
int32_t cnt = 0;
|
||||
void* data = NULL;
|
||||
while (1) {
|
||||
int32_t cnt = 0;
|
||||
void* data = NULL;
|
||||
while (1) {
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
qDebug("stream exec over, queue empty");
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
qDebug("stream exec over, queue empty");
|
||||
break;
|
||||
}
|
||||
if (data == NULL) {
|
||||
data = qItem;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
continue;
|
||||
} else {
|
||||
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
break;
|
||||
}
|
||||
if (data == NULL) {
|
||||
data = qItem;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
continue;
|
||||
} else {
|
||||
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
break;
|
||||
} else {
|
||||
cnt++;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||
taosFreeQitem(qItem);
|
||||
}
|
||||
cnt++;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||
taosFreeQitem(qItem);
|
||||
}
|
||||
}
|
||||
if (data == NULL) break;
|
||||
}
|
||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||
if (data) streamFreeQitem(data);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
qDebug("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt);
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
qDebug("stream task %d exec end", pTask->taskId);
|
||||
if (data == NULL) return pRes;
|
||||
|
||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
qDebug("stream task %d exec end", pTask->taskId);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
taosArrayDestroy(pRes);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
taosArrayDestroy(pRes);
|
||||
return NULL;
|
||||
}
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return NULL;
|
||||
}
|
||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
streamFreeQitem(data);
|
||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
}
|
||||
|
||||
streamFreeQitem(data);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -171,6 +170,11 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
FAIL:
|
||||
if (pRes) taosArrayDestroy(pRes);
|
||||
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
||||
return -1;
|
||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||
tFreeSStreamTask(pTask);
|
||||
return 0;
|
||||
} else {
|
||||
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,6 +163,6 @@ void tFreeSStreamTask(SStreamTask* pTask) {
|
|||
streamQueueClose(pTask->inputQueue);
|
||||
streamQueueClose(pTask->outputQueue);
|
||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||
qDestroyTask(pTask->exec.executor);
|
||||
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
|
|
|
@ -41,7 +41,8 @@ print =============== show streams ================================
|
|||
sql show streams;
|
||||
print $data00 $data01 $data02
|
||||
|
||||
if $data00 != d1 then
|
||||
if $data00 != sma_index_name1 then
|
||||
print $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue