refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-09 01:39:09 +08:00
parent 7e734e3088
commit f5b52749c2
36 changed files with 625 additions and 419 deletions

View File

@ -26,6 +26,7 @@ extern "C" {
typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
@ -118,7 +119,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
* @param isAdd
* @return
*/
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList);
/**
* Create the exec task object according to task json
@ -162,6 +163,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
bool qTaskIsExecuting(qTaskInfo_t qinfo);
@ -181,21 +183,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamSetOpen(qTaskInfo_t tinfo);

View File

@ -194,6 +194,7 @@ typedef struct SRequestConnInfo {
typedef void (*__freeFunc)(void* param);
// todo add creator/destroyer function
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset

View File

@ -225,15 +225,15 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
typedef struct {
char* qmsg;
// followings are not applicable to encoder and decoder
void* executor;
char* qmsg;
void* pExecutor; // not applicable to encoder and decoder
struct STqReader* pTqReader; // not applicable to encoder and decoder
} STaskExec;
typedef struct {
@ -280,16 +280,20 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
typedef struct SStreamId {
int64_t streamId;
int32_t taskId;
const char* idStr;
} SStreamId;
int8_t taskStatus;
int8_t schedStatus;
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
int8_t taskStatus;
int8_t schedStatus;
// node info
int32_t selfChildId;
@ -319,11 +323,8 @@ struct SStreamTask {
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
int8_t outputStatus;
// STaosQueue* inputQueue1;
// STaosQall* inputQall;
int8_t inputStatus;
int8_t outputStatus;
SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
@ -345,8 +346,8 @@ struct SStreamTask {
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
};
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -355,8 +356,9 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask);
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);

View File

@ -138,6 +138,7 @@ typedef struct {
int8_t enableRef;
} SWalFilterCond;
// todo hide this struct
typedef struct {
SWal *pWal;
int64_t readerId;

View File

@ -36,14 +36,6 @@ extern "C" {
#include "tconfig.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
goto label; \
} \
} while (0)
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
@ -286,28 +278,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
return (SReqResultInfo*)&msg->resInfo;
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
}
setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
}
return NULL;
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4);
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
@ -320,7 +291,6 @@ extern int32_t clientConnRefPool;
extern int32_t timestampDeltaLimit;
extern int64_t lastClusterId;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
@ -373,7 +343,6 @@ void taos_close_internal(void* taos);
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
@ -386,9 +355,6 @@ void stopAllRequests(SHashObj* pRequests);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
// --- mq
void hbMgrInitMqHbRspHandle();
typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx;
SCatalogReq* pCatalogReq;

View File

@ -1039,8 +1039,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
.sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL;
SQueryPlan* pDag = NULL;
int64_t st = taosGetTimestampUs();
int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
@ -1052,7 +1051,6 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
}
pRequest->metric.execStart = taosGetTimestampUs();
pRequest->metric.planCostUs = pRequest->metric.execStart - st;
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {

View File

@ -154,7 +154,7 @@ typedef struct {
typedef struct {
int8_t tmqRspType;
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
@ -210,6 +210,11 @@ typedef struct {
tmq_t* pTmq;
} SMqCommitCbParam;
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
@ -521,11 +526,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
@ -786,11 +787,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
goto OVER;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
@ -2115,11 +2112,6 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
}
}
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
@ -2298,3 +2290,26 @@ void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, cons
waitingRspNum);
}
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
}
setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
}
return NULL;
}

View File

@ -158,7 +158,10 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
@ -166,11 +169,14 @@ void tFreeStreamObj(SStreamObj *pStream) {
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
tFreeSStreamTask(pTask);
tFreeStreamTask(pTask);
}
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {
taosMemoryFree(pStream->tagSchema.pSchema);

View File

@ -138,7 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
pVgInfo->taskId = pLastLevelTask->id.taskId;
break;
}
}
@ -149,7 +149,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
@ -440,7 +440,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
@ -460,7 +460,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->taskId;
pEpInfo->taskId = pTask->id.taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
sdbRelease(pSdb, pVgroup);
}

View File

@ -600,7 +600,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
@ -1208,7 +1208,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->taskId, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};

View File

@ -83,14 +83,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1;
}
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
};
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.executor);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
return 0;

View File

@ -256,15 +256,15 @@ void tqCloseReader(STqReader *);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
void tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);

View File

@ -80,7 +80,7 @@ typedef struct {
typedef struct {
int8_t subType;
STqReader* pExecReader;
STqReader* pTqReader;
qTaskInfo_t task;
union {
STqExecCol execCol;

View File

@ -531,10 +531,11 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
return;
}
SHashObj* pHashObj = (SHashObj*)p[0];
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
{
if (pEntry != NULL && (*pEntry) != NULL) {
int64_t st = taosGetTimestampUs();
SListIter iter = {0};
@ -547,9 +548,9 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
taosMemoryFree(tmp);
int64_t et = taosGetTimestampUs();
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
(et - st) / 1000.0);
double el = (taosGetTimestampUs() - st) / 1000.0;
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
el);
break;
}
}

View File

@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
if ((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd, NULL)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr());

View File

@ -15,6 +15,8 @@
#include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
int32_t tqInit() {
int8_t old;
while (1) {
@ -57,12 +59,12 @@ static void destroyTqHandle(void* data) {
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
tqCloseReader(pData->execHandle.pExecReader);
tqCloseReader(pData->execHandle.pTqReader);
walCloseReader(pData->pWalReader);
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pExecReader);
tqCloseReader(pData->execHandle.pTqReader);
}
}
@ -78,12 +80,33 @@ static void tqPushEntryFree(void* data) {
taosMemoryFree(p);
}
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
}
// stream_task:stream_id:task_id
static void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
int32_t n = 12;
char* p = dst;
memcpy(p, "stream_task:", n);
p += n;
int32_t inc = tintToHex(streamId, p);
p += inc;
*(p++) = ':';
tintToHex(taskId, p);
}
STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTq->path = taosStrdup(path);
pTq->pVnode = pVnode;
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
@ -249,11 +272,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0};
int32_t vgId = TD_VID(pTq->pVnode);
@ -432,8 +450,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
@ -805,10 +821,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
@ -827,8 +843,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
pHandle->execHandle.pTqReader = tqOpenReader(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
@ -886,16 +902,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
#if 0
if (pTask->taskLevel == TASK_LEVEL__AGG) {
A(taosArrayGetSize(pTask->childEpInfo) != 0);
}
#endif
// todo extract method
char buf[128] = {0};
sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId);
int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = taosStrdup(buf);
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
@ -920,14 +934,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
};
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.executor == NULL) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
@ -936,14 +946,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->pState == NULL) {
return -1;
}
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
if (pTask->exec.executor == NULL) {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo);
SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState};
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
}
@ -964,15 +972,26 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
ver1 = info.skmVer;
}
pTask->tbSink.pTSchema =
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper;
pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if(pTask->tbSink.pTSchema == NULL) {
return -1;
}
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->exec.pTqReader = tqOpenReader(pTq->pVnode);
if (pTask->exec.pTqReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor);
tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
}
streamSetupTrigger(pTask);
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
tqInfo("vgId:%d expand stream task, s-task:%s, child id %d, level %d", vgId, pTask->id.idStr, pTask->selfChildId, pTask->taskLevel);
return 0;
}
@ -995,6 +1014,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
rsp.status = 1;
@ -1085,6 +1105,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
// 2.save task
@ -1298,7 +1319,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver);
if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
@ -1308,7 +1329,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
qError("stream task input del failed, task id %d", pTask->id.taskId);
atomic_sub_fetch_32(pRef, 1);
taosFreeQitem(pRefBlock);
@ -1316,7 +1337,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
qError("stream task launch failed, task id %d", pTask->id.taskId);
continue;
}
@ -1343,12 +1364,12 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
if (!failed) {
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId);
qError("stream task input del failed, task id %d", pTask->id.taskId);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
qError("stream task launch failed, task id %d", pTask->id.taskId);
continue;
}
} else {
@ -1361,15 +1382,82 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0;
}
static int32_t doAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t code = tAppendDataForStream(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
return -1;
}
if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
return -1;
}
return TSDB_CODE_SUCCESS;
}
static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
const char* key, int64_t ver) {
doSaveTaskOffset(pOffsetStore, key, ver);
int32_t code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
// remove the offset, if all functions are completed successfully.
if (code == TSDB_CODE_SUCCESS) {
tqOffsetDelete(pOffsetStore, key);
}
return TSDB_CODE_SUCCESS;
}
static void saveOffsetForAllTasks(STQ* pTq, SPackedData submit) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
}
}
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
void* pIter = NULL;
bool succ = true;
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit);
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory");
succ = false;
saveOffsetForAllTasks(pTq, submit);
return -1;
}
while (1) {
@ -1384,35 +1472,85 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus);
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus);
continue;
}
tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver);
if (succ) {
int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit);
if (code < 0) {
// let's handle the back pressure
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
tqError("stream task:%d failed to put into queue for, too many", pTask->taskId);
continue;
if (tInputQueueIsFull(pTask)) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
int64_t ver = submit.ver;
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
} else {
ver = pOffset->val.version;
}
if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno));
continue;
tqDebug("s-task:%s input queue is full, do nothing, start ver:%" PRId64, pTask->id.idStr, ver);
continue;
}
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset != NULL) {
// seek the stored version and extract data from WAL
int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, "");
// all data has been retrieved from WAL, let's try submit block directly.
if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort
// append the data for the stream
SFetchRet ret = {0};
terrno = 0;
tqNextBlock(pTask->exec.pTqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (pBlocks == NULL) { // failed, do nothing
terrno = TSDB_CODE_OUT_OF_MEMORY;
continue;
}
ret.data.info.type = STREAM_NORMAL;
pBlocks->type = STREAM_INPUT__DATA_BLOCK;
pBlocks->sourceVer = pOffset->val.version;
pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock));
taosArrayPush(pBlocks->blocks, &ret.data);
int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData);
// tqDebug("-----------%ld\n", ts[0]);
code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = pTask->exec.pTqReader->pWalReader->curVersion;
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version);
}
} else { // FETCH_TYPE__NONE, let's try submit block directly
tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
// do nothing if failed, since the offset value is kept already
} else { // failed to seek to the WAL version
// todo handle the case where offset has been deleted in WAL, due to stream computing too slow
tqDebug("s-task:%s data in WAL are all consumed, try data in submit msg", pTask->id.idStr);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
} else {
streamTaskInputFail(pTask);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
}
if (pSubmit != NULL) {
streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit);
}
streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit);
return succ ? 0 : -1;
return 0;
}
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
@ -1420,6 +1558,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
tqDebug("stream task:%d start to process run req", pTask->id.taskId);
streamProcessRunReq(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
@ -1456,7 +1595,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
tqDebug("recv dispatch rsp, code: %x", pMsg->code);
tqDebug("recv dispatch rsp, code:%x", pMsg->code);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@ -1484,10 +1623,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tDeleteStreamRetrieveReq(&req);
@ -1523,10 +1659,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessDispatchReq(pTask, &req, &rsp, false);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
rpcFreeCont(pMsg->pCont);
@ -1543,10 +1676,7 @@ FAIL:
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (pRspHead == NULL) {
SRpcMsg rsp = {
.code = TSDB_CODE_OUT_OF_MEMORY,
.info = pMsg->info,
};
SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info };
tqDebug("send dispatch error rsp, code: %x", code);
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
@ -1564,11 +1694,7 @@ FAIL:
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
.contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
.pCont = pRspHead,
};
.code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
tqDebug("send dispatch error rsp, code: %x", code);
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);

View File

@ -320,15 +320,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
code = -1;
goto end;
}
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pExecReader == NULL) {
handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pTqReader == NULL) {
tqError("cannot extract exec reader for %s", handle.subKey);
code = -1;
goto end;
}
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext));
@ -343,8 +343,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,

View File

@ -128,31 +128,35 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
}
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
if (!pStore->needCommit) return 0;
if (!pStore->needCommit) {
return 0;
}
// TODO file name should be with a newer version
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
sysErrStr);
const char* err = strerror(errno);
tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err);
taosMemoryFree(fname);
return -1;
}
taosMemoryFree(fname);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStore->pHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
STqOffset* pOffset = (STqOffset*)pIter;
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
if (code < 0) {
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
@ -166,6 +170,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
@ -174,8 +179,10 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
taosMemoryFree(buf);
return -1;
}
taosMemoryFree(buf);
}
// close and rename file
taosCloseFile(&pFile);
pStore->needCommit = 0;

View File

@ -331,6 +331,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
if (msgType == TDMT_VND_SUBMIT) {
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
return -1;
@ -339,7 +340,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
memcpy(data, pReq, len);
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqDebug("tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
}

View File

@ -113,7 +113,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0);
metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
@ -262,8 +262,6 @@ STqReader* tqOpenReader(SVnode* pVnode) {
}
pReader->pVnodeMeta = pVnode->pMeta;
/*pReader->pMsg = NULL;*/
// pReader->ver = -1;
pReader->pColIdList = NULL;
pReader->cachedSchemaVer = 0;
pReader->cachedSchemaSuid = 0;
@ -296,10 +294,10 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
tqDebug("wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
return -1;
}
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id);
tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id);
return 0;
}
@ -310,26 +308,28 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret->fetchType = FETCH_TYPE__NONE;
return;
}
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version;
tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
}
while (tqNextDataBlock2(pReader)) {
while (tqNextDataBlock(pReader)) {
memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL);
if (code != 0 || ret->data.info.rows == 0) {
continue;
}
ret->fetchType = FETCH_TYPE__DATA;
return;
}
}
}
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver;
@ -346,7 +346,7 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen,
return 0;
}
bool tqNextDataBlock2(STqReader* pReader) {
bool tqNextDataBlock(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL) {
return false;
}
@ -355,13 +355,20 @@ bool tqNextDataBlock2(STqReader* pReader) {
while (pReader->nextBlk < blockSz) {
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
pReader->msg2.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
if (pReader->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid);
return true;
} else {
tqDebug("tq reader discard block, uid:%"PRId64", continue", pSubmitTbData->uid);
}
pReader->nextBlk++;
}
@ -901,7 +908,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
return 0;
}
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
int tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
if (pReader->tbIdHash == NULL) {
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (pReader->tbIdHash == NULL) {
@ -910,8 +917,9 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
}
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
int32_t numOfTables = taosArrayGetSize(pTableUidList);
for (int i = 0; i < numOfTables; i++) {
int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
@ -927,30 +935,34 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
return 0;
}
// todo update the table list in wal reader
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
void* pIter = NULL;
int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle
while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) {
break;
}
STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
STqHandle* pTqHandle = (STqHandle*)pIter;
if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd, NULL);
if (code != 0) {
tqError("update qualified table error for %s", pExec->subKey);
tqError("update qualified table error for %s", pTqHandle->subKey);
continue;
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) {
int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
@ -965,35 +977,50 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
tDecoderClear(&mr.coder);
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) {
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) {
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
continue;
}
tqDebug("table uid %" PRId64 " add to tq handle", *id);
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
if (taosArrayGetSize(qa) > 0) {
tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa);
tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa);
}
taosArrayDestroy(qa);
} else {
tqReaderRemoveTbUidList(pExec->execHandle.pExecReader, tbUidList);
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
}
}
}
// update the table list handle for each stream scanner/wal reader
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
SArray* pList = NULL;
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd, pList);
if (code != 0) {
tqError("update qualified table error for stream task %d", pTask->taskId);
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
continue;
}
if (isAdd) { // only add qualified tables
tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
} else {
tqReaderRemoveTbUidList(pTask->exec.pTqReader, tbUidList);
}
}
}
return 0;
}

View File

@ -38,7 +38,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
if (pSW == NULL) {
return -1;
}
@ -135,7 +135,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->lastBlkUid;
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
continue;
}
@ -200,9 +200,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader;
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock2(pReader)) {
STqReader* pReader = pExec->pTqReader;
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock(pReader)) {
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
@ -210,7 +210,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->lastBlkUid;
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
@ -259,8 +259,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader;
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
STqReader* pReader = pExec->pTqReader;
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
@ -269,7 +269,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->lastBlkUid;
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);

View File

@ -87,7 +87,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
return;
}
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
tqDebug("vgId:%d, s-task:%s write into table, block num: %d", TD_VID(pVnode), pTask->id.idStr, blockSz);
for (int32_t i = 0; i < blockSz; i++) {
bool createTb = true;
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
@ -382,7 +382,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t blockSz = taosArrayGetSize(pBlocks);
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
void* pBuf = NULL;
SArray* tagArray = NULL;
@ -475,11 +475,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = {
.cid = pTSchema->numOfCols + step,
.type = pTagData->info.type,
};
void* pData = colDataGetData(pTagData, rowId);
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
continue;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {

View File

@ -447,13 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
/*vInfo("vgId:%d, push msg begin", pVnode->config.vgId);*/
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
// commit if need
if (needCommit) {

View File

@ -127,7 +127,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("task stream set total blocks:%d %s", (int32_t)numOfBlocks, id);
qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
@ -363,27 +363,28 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
return qa;
}
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);
int32_t code = 0;
if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
}
// traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot;
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
pInfo = pInfo->pDownstream[0];
}
int32_t code = 0;
SOperatorInfo* pInfo = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
if (pList != NULL) {
taosArrayAddAll(pList, qa);
}
qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(qa);
@ -424,19 +425,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
#if 0
bool exists = false;
for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
if (pKeyInfo->uid == keyInfo.uid) {
qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
exists = true;
}
}
if (!exists) {
#endif
tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
}
@ -447,7 +435,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
taosArrayDestroy(qa);
} else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
taosWLockLatch(&pTaskInfo->lock);
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
taosWUnLockLatch(&pTaskInfo->lock);
@ -1263,3 +1251,21 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
destroySendMsgInfo(pSendInfo);
}
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = tinfo;
SArray* plist = getTableListInfo(pTaskInfo);
// only extract table in the first elements
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
int32_t numOfTables = tableListGetSize(pTableListInfo);
for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
taosArrayPush(pUidList, &pKeyInfo->uid);
}
taosArrayDestroy(plist);
return pUidList;
}

View File

@ -2001,7 +2001,11 @@ void qStreamCloseTsdbReader(void* task) {
}
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
} else {

View File

@ -1635,7 +1635,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
return NULL;
}
@ -1644,7 +1644,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock2(pInfo->tqReader)) {
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
@ -1805,7 +1805,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*resetTableScanInfo(pTSInfo, pWin);*/
tsdbReaderClose(pTSInfo->base.dataReader);
qDebug("4");
pTSInfo->base.dataReader = NULL;
pInfo->pTableScanOp->status = OP_OPENED;
@ -1888,7 +1887,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
qDebug("5");
pTSInfo->base.dataReader = NULL;
@ -1915,6 +1913,7 @@ FETCH_NEXT_BLOCK:
if (pBlock->info.parTbName[0]) {
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
}
// TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
@ -2057,7 +2056,7 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum);
continue;
@ -2066,7 +2065,7 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock2(pInfo->tqReader)) {
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);

View File

@ -2432,10 +2432,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
SWinKey key = {
.ts = nextWin.skey,
.groupId = groupId,
};
SWinKey key = { .ts = nextWin.skey, .groupId = groupId };
saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, &key, pResult);
if (pInfo->delKey.ts > key.ts) {
@ -4771,6 +4769,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, "single interval recv");

View File

@ -44,7 +44,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
#ifdef __cplusplus
}

View File

@ -16,6 +16,8 @@
#include "streamInc.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2
int32_t streamInit() {
int8_t old;
while (1) {
@ -96,13 +98,14 @@ int32_t streamSchedExec(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
return -1;
}
pRunReq->head.vgId = pTask->nodeId;
pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId;
pRunReq->streamId = pTask->id.streamId;
pRunReq->taskId = pTask->id.taskId;
SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) };
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
@ -142,7 +145,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
pCont->downstreamNodeId = htonl(pTask->nodeId);
pCont->downstreamTaskId = htonl(pTask->taskId);
pCont->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
@ -155,7 +158,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue
if (pData != NULL) {
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId,
pReq->srcTaskId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
@ -205,7 +208,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
}
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
qDebug("task %d receive dispatch req from node %d task %d", pTask->id.taskId, pReq->upstreamNodeId,
pReq->upstreamTaskId);
streamTaskEnqueue(pTask, pReq, pRsp);
@ -228,12 +231,11 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code);
qDebug("task %d receive dispatch rsp, code: %x", pTask->id.taskId, code);
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp);
if (leftRsp > 0) return 0;
}
@ -261,7 +263,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
qDebug("task %d receive retrieve req from node %d task %d", pTask->id.taskId, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
@ -275,26 +277,43 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0;
}
bool tInputQueueIsFull(const SStreamTask* pTask) {
return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
}
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
qDebug("s-task:%s submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr,
pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, total);
if (total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) {
qDebug("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY);
streamDataSubmitDestroy(pSubmitBlock);
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
if (total > 2) {
qDebug("stream task input queue is full, abort");
return -1;
}
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total);
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);

View File

@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0;
}
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
if (pDataSubmit == NULL) {
@ -82,7 +82,7 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
pDataSubmit->submit = submit;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
pDataSubmit->type = type;
return pDataSubmit;
}
@ -139,28 +139,27 @@ SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
return pSubmitClone;
}
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem);
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(elem);
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)elem;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)elem);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem);
taosFreeQitem(dst);
taosFreeQitem(elem);
taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged;
} else {
return NULL;

View File

@ -121,9 +121,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
SStreamRetrieveReq req = {
.streamId = pTask->streamId,
.streamId = pTask->id.streamId,
.srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId,
.srcTaskId = pTask->id.taskId,
.pRetrieve = pRetrieve,
.retrieveLen = dataStrLen,
};
@ -168,7 +168,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
}
buf = NULL;
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId,
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
}
code = 0;
@ -238,7 +238,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("dispatch from task %d to task %d node %d: check msg", pTask->taskId, pReq->downstreamTaskId, nodeId);
qDebug("dispatch from task %d to task %d node %d: check msg", pTask->id.taskId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
@ -282,7 +282,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
tmsgSendReq(pEpSet, &msg);
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->taskId, pReq->taskId, vgId);
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId);
return 0;
FAIL:
@ -319,7 +319,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from task %d to task %d node %d: data msg", pTask->taskId, pReq->taskId, vgId);
qDebug("dispatch from task %d to task %d node %d: data msg", pTask->id.taskId, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg);
@ -382,9 +382,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.streamId = pTask->id.streamId,
.dataSrcVgId = pData->srcVgId,
.upstreamTaskId = pTask->taskId,
.upstreamTaskId = pTask->id.taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
@ -408,7 +408,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId;
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->id.taskId, pTask->selfChildId,
downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
@ -432,9 +432,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
for (int32_t i = 0; i < vgSz; i++) {
pReqs[i].streamId = pTask->streamId;
pReqs[i].streamId = pTask->id.streamId;
pReqs[i].dataSrcVgId = pData->srcVgId;
pReqs[i].upstreamTaskId = pTask->taskId;
pReqs[i].upstreamTaskId = pTask->id.taskId;
pReqs[i].upstreamChildId = pTask->selfChildId;
pReqs[i].upstreamNodeId = pTask->nodeId;
pReqs[i].blockNum = 0;
@ -503,13 +503,13 @@ int32_t streamDispatch(SStreamTask* pTask) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) {
qDebug("stream stop dispatching since no output: task %d", pTask->taskId);
qDebug("stream stop dispatching since no output: task %d", pTask->id.taskId);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qDebug("stream dispatching: task %d", pTask->taskId);
qDebug("stream dispatching: task %d", pTask->id.taskId);
int32_t code = 0;
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {

View File

@ -18,8 +18,9 @@
#define STREAM_EXEC_MAX_BATCH_NUM 100
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code;
void* exec = pTask->exec.executor;
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
while(pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history");
taosMsleep(2);
@ -30,31 +31,35 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__GET_RES) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qDebug("stream task:%d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
qDebug("s-task:%s set submit blocks as input %p %p %d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver);
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
SArray* blocks = pMerged->submits;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
qSetMultiStreamInput(exec, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else {
ASSERT(0);
}
// exec
// pExecutor
while (1) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
return 0;
@ -62,26 +67,28 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(exec, &output, &ts)) < 0) {
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(exec);
resetTaskInfo(pExecutor);
}
/*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr());
qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
continue;
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->taskId, pTask->selfChildId,
qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId,
pRetrieveBlock->reqId);
}
break;
@ -94,20 +101,21 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
continue;
}
qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block);
}
return 0;
}
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
qSetStreamOpOpen(exec);
bool finished = false;
@ -147,17 +155,17 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
batchCnt++;
qDebug("task %d scan exec block num %d, block limit %d", pTask->taskId, batchCnt, batchSz);
qDebug("task %d scan exec block num %d, block limit %d", pTask->id.taskId, batchCnt, batchSz);
if (batchCnt >= batchSz) break;
}
if (taosArrayGetSize(pRes) == 0) {
if (finished) {
taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->taskId);
qDebug("task %d finish recover exec task ", pTask->id.taskId);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->taskId);
qDebug("task %d continue recover exec task ", pTask->id.taskId);
continue;
}
}
@ -173,7 +181,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
streamTaskOutput(pTask, qRes);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("task %d scan exec dispatch block num %d", pTask->taskId, batchCnt);
qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt);
streamDispatch(pTask);
}
if (finished) break;
@ -186,7 +194,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
// fetch all queue item, merge according to batchLimit
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
if (numOfItems == 0) {
qDebug("task: %d, stream task exec over, queue empty", pTask->taskId);
qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
return 0;
}
SStreamQueueItem* pMerged = NULL;
@ -221,30 +229,33 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchCnt = 1;
void* input = NULL;
int32_t batchSize = 1;
void* pInput = NULL;
// merge multiple input data if possible in the input queue.
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
qDebug("stream task exec over, queue empty, task: %d", pTask->id.taskId);
break;
}
if (input == NULL) {
input = qItem;
if (pInput == NULL) {
pInput = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
void* newRet;
if ((newRet = streamMergeQueueItem(input, qItem)) == NULL) {
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
batchCnt++;
input = newRet;
batchSize++;
pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue);
if (batchCnt > STREAM_EXEC_MAX_BATCH_NUM) {
if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) {
break;
}
}
@ -252,75 +263,82 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (input) streamFreeQitem(input);
if (pInput) {
streamFreeQitem(pInput);
}
return 0;
}
if (input == NULL) {
if (pInput == NULL) {
break;
}
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, input);
ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, pInput);
continue;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s exec begin, msg batch: %d", pTask->id.idStr, batchSize);
qDebug("stream task:%d exec begin, msg batch: %d", pTask->taskId, batchCnt);
streamTaskExecImpl(pTask, input, pRes);
streamTaskExecImpl(pTask, pInput, pRes);
qDebug("stream task:%d exec end", pTask->taskId);
qDebug("s-task:%s exec end", pTask->id.idStr);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(input);
streamFreeQitem(pInput);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)input;
if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)input;
} else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
if (streamTaskOutput(pTask, qRes) < 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(input);
streamFreeQitem(pInput);
taosFreeQitem(qRes);
return -1;
}
} else {
taosArrayDestroy(pRes);
}
streamFreeQitem(input);
streamFreeQitem(pInput);
}
return 0;
}
int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask);
if (code < 0) {
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;
}
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
streamSchedExec(pTask);
}
}
return 0;
}

View File

@ -23,6 +23,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
@ -83,7 +84,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosTmrStop(pTask->timer);
pTask->timer = NULL;
}
tFreeSStreamTask(pTask);
tFreeStreamTask(pTask);
/*streamMetaReleaseTask(pMeta, pTask);*/
}
taosHashCleanup(pMeta->pTasks);
@ -111,12 +112,12 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg,
goto FAIL;
}
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
ASSERT(0);
goto FAIL;
}
@ -124,7 +125,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg,
return 0;
FAIL:
if (pTask) tFreeSStreamTask(pTask);
if (pTask) tFreeStreamTask(pTask);
return -1;
}
#endif
@ -147,7 +148,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
tEncodeSStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
return -1;
}
@ -165,7 +166,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
}
@ -207,7 +208,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(left >= 0);
if (left == 0) {
ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING);
tFreeSStreamTask(pTask);
tFreeStreamTask(pTask);
}
}
@ -297,7 +298,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);

View File

@ -16,7 +16,7 @@
#include "streamInc.h"
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
qDebug("task %d at node %d launch recover", pTask->taskId, pTask->nodeId);
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE);
streamSetParamForRecover(pTask);
@ -56,8 +56,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
// checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
SStreamTaskCheckReq req = {
.streamId = pTask->streamId,
.upstreamTaskId = pTask->taskId,
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->nodeId,
.childId = pTask->selfChildId,
};
@ -68,7 +68,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId;
qDebug("task %d at node %d check downstream task %d at node %d", pTask->taskId, pTask->nodeId, req.downstreamTaskId,
qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -83,12 +83,12 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->taskId, pTask->nodeId,
qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("task %d at node %d direct launch recover since no downstream", pTask->taskId, pTask->nodeId);
qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId);
streamTaskLaunchRecover(pTask, version);
}
return 0;
@ -104,7 +104,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.downstreamNodeId = pRsp->downstreamNodeId,
.childId = pRsp->childId,
};
qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->taskId, pTask->nodeId,
qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->id.taskId, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
@ -160,11 +160,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
// common
int32_t streamSetParamForRecover(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
return qStreamSetParamForRecover(exec);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
return qStreamRestoreParam(exec);
}
int32_t streamSetStatusNormal(SStreamTask* pTask) {
@ -174,14 +174,14 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
return qStreamSourceRecoverStep1(exec, ver);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
pReq->msgHead.vgId = pTask->nodeId;
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
}
@ -192,13 +192,13 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
pReq->msgHead.vgId = pTask->nodeId;
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
}
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
}
return streamScanExec(pTask, 100);
@ -206,7 +206,7 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
SStreamRecoverFinishReq req = {
.streamId = pTask->streamId,
.streamId = pTask->id.streamId,
.childId = pTask->selfChildId,
};
// serialize
@ -227,13 +227,13 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
return 0;
}
int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
void* exec = pTask->exec.executor;
void* exec = pTask->exec.pExecutor;
if (qStreamRestoreParam(exec) < 0) {
return -1;
}

View File

@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pTask->taskId);
sprintf(statePath, "%s/%d", path, pTask->id.taskId);
} else {
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);

View File

@ -21,8 +21,14 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
if (pTask == NULL) {
return NULL;
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
char buf[128] = {0};
sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf);
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
@ -50,8 +56,8 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
@ -103,8 +109,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
@ -162,24 +168,43 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
return 0;
}
void tFreeSStreamTask(SStreamTask* pTask) {
qDebug("free stream task %d", pTask->taskId);
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
}
if (pTask->outputQueue) {
streamQueueClose(pTask->outputQueue);
}
if (pTask->exec.qmsg) {
taosMemoryFree(pTask->exec.qmsg);
}
if (pTask->exec.pExecutor) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
if (pTask->exec.pTqReader != NULL) {
pTask->exec.pTqReader = NULL;
}
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
}
if (pTask->pState) streamStateClose(pTask->pState);
if (pTask->pState) {
streamStateClose(pTask->pState);
}
taosMemoryFree(pTask);
}