refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-03 17:47:48 +08:00
parent 08bd021a6f
commit 3ef7d43dae
13 changed files with 115 additions and 127 deletions

View File

@ -371,43 +371,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);

View File

@ -107,7 +107,6 @@ struct tmq_t {
STaosQueue* mqueue; // queue of rsp STaosQueue* mqueue; // queue of rsp
STaosQall* qall; STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets.
tsem_t rspSem; tsem_t rspSem;
}; };
@ -188,7 +187,6 @@ typedef struct {
SMqClientVg* pVg; SMqClientVg* pVg;
SMqClientTopic* pTopic; SMqClientTopic* pTopic;
int32_t vgId; int32_t vgId;
tsem_t rspSem;
uint64_t requestId; // request id for debug purpose uint64_t requestId; // request id for debug purpose
} SMqPollCbParam; } SMqPollCbParam;
@ -979,7 +977,6 @@ void tmqFreeImpl(void* handle) {
taosFreeQall(tmq->qall); taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem); tsem_destroy(&tmq->rspSem);
taosThreadMutexDestroy(&tmq->lock);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj); taos_close_internal(tmq->pTscObj);
@ -1024,7 +1021,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->delayedTask = taosOpenQueue(); pTmq->delayedTask = taosOpenQueue();
pTmq->qall = taosAllocateQall(); pTmq->qall = taosAllocateQall();
taosThreadMutexInit(&pTmq->lock, NULL);
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
conf->groupId[0] == 0) { conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1233,7 +1229,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
@ -1419,7 +1414,7 @@ static void freeClientVgInfo(void* param) {
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false; bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
@ -1471,14 +1466,11 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
taosHashCleanup(pVgOffsetHashMap); taosHashCleanup(pVgOffsetHashMap);
taosThreadMutexLock(&tmq->lock);
// destroy current buffered existed topics info // destroy current buffered existed topics info
if (tmq->clientTopics) { if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
} }
tmq->clientTopics = newTopics; tmq->clientTopics = newTopics;
taosThreadMutexUnlock(&tmq->lock);
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY; int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag); atomic_store_8(&tmq->status, flag);
@ -1742,7 +1734,7 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/ /*tmqClearUnhandleMsg(tmq);*/
tDeleteSMqAskEpRsp(rspMsg); tDeleteSMqAskEpRsp(rspMsg);
*pReset = true; *pReset = true;
@ -2163,7 +2155,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par
SMqAskEpRsp rsp; SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(pTmq, head->epoch, &rsp); doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} }

View File

@ -819,13 +819,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->pRef = pRef; pHandle->pRef = pRef;
SReadHandle handle = { SReadHandle handle = {
.meta = pVnode->pMeta, .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
.vnode = pVnode,
.initTableReader = true,
.initTqReader = true,
.version = ver,
};
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
@ -1393,11 +1387,10 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool failed = false;
SStreamDataSubmit2* pSubmit = NULL;
pSubmit = streamDataSubmitNew(submit); SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory"); tqError("failed to create data submit for stream since out of memory");
@ -1411,7 +1404,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
} }
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue; continue;

View File

@ -207,33 +207,18 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
typedef struct { typedef struct {
void* pKey; void* pKey;
int64_t keyLen; int64_t keyLen;
} SItem; } SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter); static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
static void freeItem(void* param) { static void freeItem(void* param) {
SItem* p = (SItem*) param; SItem* p = (SItem*) param;
taosMemoryFree(p->pKey); taosMemoryFree(p->pKey);
} }
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) { int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
@ -347,7 +332,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory"); tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
return -1; return -1;
} }
@ -366,13 +351,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0; return 0;
} }
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
SMqDataRsp* pDataRsp, int32_t type) { SMqDataRsp* pDataRsp, int32_t type) {
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
@ -430,3 +408,26 @@ int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64
return 0; return 0;
} }
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}

View File

@ -541,6 +541,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_TMQ_CONSUME: case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
#if 1 #if 1

View File

@ -502,32 +502,12 @@ typedef struct STableCountScanSupp {
char stbNameFilter[TSDB_TABLE_NAME_LEN]; char stbNameFilter[TSDB_TABLE_NAME_LEN];
} STableCountScanSupp; } STableCountScanSupp;
typedef struct STableCountScanOperatorInfo {
SReadHandle readHandle;
SSDataBlock* pRes;
STableCountScanSupp supp;
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
bool mergeResultBlock; bool mergeResultBlock;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
bool groupKeyOptimized;
} SAggOperatorInfo;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter

View File

@ -40,6 +40,16 @@ typedef struct {
int32_t startOffset; int32_t startOffset;
} SFunctionCtxStatus; } SFunctionCtxStatus;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
bool groupKeyOptimized;
} SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param);
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);

View File

@ -127,12 +127,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id); qDebug("task stream set total blocks:%d %s", (int32_t)numOfBlocks, id);
ASSERT(pInfo->validBlockIndex == 0); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);

View File

@ -33,7 +33,6 @@
int32_t scanDebug = 0; int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000 #define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -52,6 +51,16 @@ typedef struct STableMergeScanSortSourceParam {
STsdbReader* dataReader; STsdbReader* dataReader;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
typedef struct STableCountScanOperatorInfo {
SReadHandle readHandle;
SSDataBlock* pRes;
STableCountScanSupp supp;
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;
static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) { bool processBlockWithProbability(const SSampleExecInfo* pInfo) {

View File

@ -4758,6 +4758,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (!pInfo->pUpdated) { if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
} }
if (!pInfo->pUpdatedMap) { if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);

View File

@ -92,22 +92,22 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
return -1; return -1;
} }
pRunReq->head.vgId = pTask->nodeId; pRunReq->head.vgId = pTask->nodeId;
pRunReq->streamId = pTask->streamId; pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId; pRunReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = TDMT_STREAM_TASK_RUN, SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) };
.pCont = pRunReq,
.contLen = sizeof(SStreamTaskRunReq),
};
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
} }
return 0; return 0;
} }
@ -275,7 +275,38 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0; return 0;
} }
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) { int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
// // int8_t type = pItem->type;
// return 0;
// } if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
pTask, pItem, pSubmitClone, pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen,
pSubmitClone->submit.ver, pTask->inputQueue->queue->numOfItems);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}

View File

@ -68,16 +68,20 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) {
return NULL;
}
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) goto FAIL; if (pDataSubmit->dataRef == NULL) {
taosFreeQitem(pDataSubmit);
return NULL;
}
pDataSubmit->submit = submit; pDataSubmit->submit = submit;
*pDataSubmit->dataRef = 1; *pDataSubmit->dataRef = 1;
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
return pDataSubmit; return pDataSubmit;
FAIL:
taosFreeQitem(pDataSubmit);
return NULL;
} }
SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit2* streamMergedSubmitNew() {

View File

@ -34,7 +34,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr, qDebug("stream task:%d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgLen, pSubmit->submit.ver);
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
@ -268,9 +268,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt); qDebug("stream task:%d exec begin, msg batch: %d", pTask->taskId, batchCnt);
streamTaskExecImpl(pTask, input, pRes); streamTaskExecImpl(pTask, input, pRes);
qDebug("stream task %d exec end", pTask->taskId);
qDebug("stream task:%d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);