diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cdcf54bc61..5b1d1fa1bc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -223,27 +223,12 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } -static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { - int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); - if (dequeueFlag == STREAM_QUEUE__FAILED) { - ASSERT(queue->qItem != NULL); - return streamQueueCurItem(queue); - } else { - queue->qItem = NULL; - taosGetQitem(queue->qall, &queue->qItem); - if (queue->qItem == NULL) { - taosReadAllQitems(queue->queue, queue->qall); - taosGetQitem(queue->qall, &queue->qItem); - } - return streamQueueCurItem(queue); - } -} +void* streamQueueNextItem(SStreamQueue* queue); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit); +void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); -void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit); - -SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit); +SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); typedef struct { char* qmsg; @@ -371,43 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); - -static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; - if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem); - if (pSubmitClone == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone, - pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver); - taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); - // qStreamInput(pTask->exec.executor, pSubmitClone); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || - type == STREAM_INPUT__REF_DATA_BLOCK) { - taosWriteQitem(pTask->inputQueue->queue, pItem); - // qStreamInput(pTask->exec.executor, pItem); - } else if (type == STREAM_INPUT__CHECKPOINT) { - taosWriteQitem(pTask->inputQueue->queue, pItem); - // qStreamInput(pTask->exec.executor, pItem); - } else if (type == STREAM_INPUT__GET_RES) { - taosWriteQitem(pTask->inputQueue->queue, pItem); - // qStreamInput(pTask->exec.executor, pItem); - } - - if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - } - -#if 0 - // TODO: back pressure - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; -} +int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 1f6b205cdf..d05b5418b3 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); typedef struct STaosQnode STaosQnode; -typedef struct STaosQnode { +struct STaosQnode { STaosQnode *next; STaosQueue *queue; int64_t timestamp; @@ -70,9 +70,9 @@ typedef struct STaosQnode { int8_t itype; int8_t reserved[3]; char item[]; -} STaosQnode; +}; -typedef struct STaosQueue { +struct STaosQueue { STaosQnode *head; STaosQnode *tail; STaosQueue *next; // for queue set @@ -84,22 +84,22 @@ typedef struct STaosQueue { int64_t memOfItems; int32_t numOfItems; int64_t threadId; -} STaosQueue; +}; -typedef struct STaosQset { +struct STaosQset { STaosQueue *head; STaosQueue *current; TdThreadMutex mutex; tsem_t sem; int32_t numOfQueues; int32_t numOfItems; -} STaosQset; +}; -typedef struct STaosQall { +struct STaosQall { STaosQnode *current; STaosQnode *start; int32_t numOfItems; -} STaosQall; +}; STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 59a407656d..3e963dd3e8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -107,7 +107,6 @@ struct tmq_t { STaosQueue* mqueue; // queue of rsp STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit - TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets. tsem_t rspSem; }; @@ -188,7 +187,6 @@ typedef struct { SMqClientVg* pVg; SMqClientTopic* pTopic; int32_t vgId; - tsem_t rspSem; uint64_t requestId; // request id for debug purpose } SMqPollCbParam; @@ -979,7 +977,6 @@ void tmqFreeImpl(void* handle) { taosFreeQall(tmq->qall); tsem_destroy(&tmq->rspSem); - taosThreadMutexDestroy(&tmq->lock); taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); @@ -1024,7 +1021,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->delayedTask = taosOpenQueue(); pTmq->qall = taosAllocateQall(); - taosThreadMutexInit(&pTmq->lock, NULL); if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1233,7 +1229,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -1343,8 +1338,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosWriteQitem(tmq->mqueue, pRspWrapper); + int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, - tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId); + tmq->consumerId, rspType, vgId, total, requestId); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); @@ -1419,7 +1415,7 @@ static void freeClientVgInfo(void* param) { taosArrayDestroy(pTopic->vgs); } -static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { +static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); @@ -1471,14 +1467,11 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { taosHashCleanup(pVgOffsetHashMap); - taosThreadMutexLock(&tmq->lock); // destroy current buffered existed topics info if (tmq->clientTopics) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); } - tmq->clientTopics = newTopics; - taosThreadMutexUnlock(&tmq->lock); int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY; atomic_store_8(&tmq->status, flag); @@ -1742,7 +1735,7 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); + doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg); /*tmqClearUnhandleMsg(tmq);*/ tDeleteSMqAskEpRsp(rspMsg); *pReset = true; @@ -2163,7 +2156,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par SMqAskEpRsp rsp; tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); - tmqUpdateEp(pTmq, head->epoch, &rsp); + doUpdateLocalEp(pTmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6cb89c13ec..84e3424264 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -161,10 +161,10 @@ void *queryThread(void *arg) { return NULL; } -static int32_t numOfThreads = 1; +int32_t numOfThreads = 1; void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { - printf("success, code:%d\n", code); + printf("auto commit success, code:%d\n\n\n\n", code); } void* doConsumeData(void* param) { @@ -173,7 +173,7 @@ void* doConsumeData(void* param) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName12"); + tmq_conf_set(conf, "group.id", "cgrpName41"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); @@ -1060,7 +1060,7 @@ TEST(clientCase, sub_tb_test) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName27"); + tmq_conf_set(conf, "group.id", "cgrpName45"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d4bdd633e9..5893a4b941 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -819,13 +819,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->pRef = pRef; SReadHandle handle = { - .meta = pVnode->pMeta, - .vnode = pVnode, - .initTableReader = true, - .initTqReader = true, - .version = ver, - }; - + .meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -1338,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { pRefBlock->dataRef = pRef; atomic_add_fetch_32(pRefBlock->dataRef, 1); - if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); atomic_sub_fetch_32(pRef, 1); @@ -1373,7 +1367,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { taosArrayPush(pStreamBlock->blocks, &block); if (!failed) { - if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); continue; } @@ -1393,15 +1387,14 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { - void* pIter = NULL; - bool failed = false; - SStreamDataSubmit2* pSubmit = NULL; + void* pIter = NULL; + bool succ = true; - pSubmit = streamDataSubmitNew(submit); + SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to create data submit for stream since out of memory"); - failed = true; + succ = false; } while (1) { @@ -1411,22 +1404,27 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { continue; } - tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver); + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus); + continue; + } - if (!failed) { - if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { - tqError("stream task input failed, task id %d", pTask->taskId); + tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver); + if (succ) { + int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit); + if (code < 0) { + // let's handle the back pressure + + tqError("stream task:%d failed to put into queue for, too many", pTask->taskId); continue; } if (streamSchedExec(pTask) < 0) { - tqError("stream task launch failed, task id %d", pTask->taskId); + tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno)); continue; } } else { @@ -1434,12 +1432,12 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { } } - if (pSubmit) { - streamDataSubmitRefDec(pSubmit); + if (pSubmit != NULL) { + streamDataSubmitDestroy(pSubmit); taosFreeQitem(pSubmit); } - return failed ? -1 : 0; + return succ ? 0 : -1; } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1619829115..62c80c4ce4 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -30,7 +30,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm // update processed atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitRefDec(pSubmit); + streamDataSubmitDestroy(pSubmit); if (pRsp->blockNum > 0) { *ppSubmit = pSubmit; return 0; @@ -58,7 +58,7 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { } while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitRefDec(pSubmit); + streamDataSubmitDestroy(pSubmit); pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); if (pSubmit == NULL) break; } @@ -120,7 +120,7 @@ int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHan int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - SStreamDataSubmit* pSubmitClone = streamSubmitRefClone(pSubmit); + SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit); if (pSubmitClone == NULL) { return -1; } @@ -207,33 +207,18 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif typedef struct { - void* pKey; + void* pKey; int64_t keyLen; } SItem; static void recordPushedEntry(SArray* cachedKey, void* pIter); +static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq); static void freeItem(void* param) { SItem* p = (SItem*) param; taosMemoryFree(p->pKey); } -static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys); - - for (int32_t i = 0; i < numOfKeys; i++) { - SItem* pItem = taosArrayGet(pCachedKeys, i); - if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey); - } - } - - if (numOfKeys > 0) { - tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); - } -} - static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, int32_t dataLen, SArray* pCachedKey) { STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; @@ -347,7 +332,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory"); + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId); return -1; } @@ -366,13 +351,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } -void recordPushedEntry(SArray* cachedKey, void* pIter) { - size_t kLen = 0; - void* key = taosHashGetKey(pIter, &kLen); - SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; - taosArrayPush(cachedKey, &item); -} - int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type) { uint64_t consumerId = pRequest->consumerId; @@ -430,3 +408,26 @@ int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64 return 0; } + +void recordPushedEntry(SArray* cachedKey, void* pIter) { + size_t kLen = 0; + void* key = taosHashGetKey(pIter, &kLen); + SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; + taosArrayPush(cachedKey, &item); +} + +void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys); + + for (int32_t i = 0; i < numOfKeys; i++) { + SItem* pItem = taosArrayGet(pCachedKeys, i); + if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { + tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey); + } + } + + if (numOfKeys > 0) { + tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); + } +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 718b5979a1..3d2b032156 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -541,6 +541,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetBatchMeta(pVnode, pMsg); case TDMT_VND_TMQ_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); #if 1 diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a6353f722a..db4c3f0b8d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -502,32 +502,12 @@ typedef struct STableCountScanSupp { char stbNameFilter[TSDB_TABLE_NAME_LEN]; } STableCountScanSupp; -typedef struct STableCountScanOperatorInfo { - SReadHandle readHandle; - SSDataBlock* pRes; - - STableCountScanSupp supp; - - int32_t currGrpIdx; - SArray* stbUidList; // when group by db_name and/or stable_name -} STableCountScanOperatorInfo; - typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; SSDataBlock* pRes; bool mergeResultBlock; } SOptrBasicInfo; -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - STableQueryInfo* current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; - bool groupKeyOptimized; -} SAggOperatorInfo; - typedef struct SIntervalAggOperatorInfo { SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index a26e3ace7b..d5fc507b94 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -40,6 +40,16 @@ typedef struct { int32_t startOffset; } SFunctionCtxStatus; +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; + bool groupKeyOptimized; +} SAggOperatorInfo; + static void destroyAggOperatorInfo(void* param); static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e1cd509cba..5a7ff42ddf 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -127,12 +127,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id); - ASSERT(pInfo->validBlockIndex == 0); - ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); + qDebug("task stream set total blocks:%d %s", (int32_t)numOfBlocks, id); + ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { - // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); taosArrayPush(pInfo->pBlockLists, pReq); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1bfa8ccfc4..e66ec49e77 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -33,7 +33,6 @@ int32_t scanDebug = 0; - #define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -52,6 +51,16 @@ typedef struct STableMergeScanSortSourceParam { STsdbReader* dataReader; } STableMergeScanSortSourceParam; +typedef struct STableCountScanOperatorInfo { + SReadHandle readHandle; + SSDataBlock* pRes; + + STableCountScanSupp supp; + + int32_t currGrpIdx; + SArray* stbUidList; // when group by db_name and/or stable_name +} STableCountScanOperatorInfo; + static bool processBlockWithProbability(const SSampleExecInfo* pInfo); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 29acc1fea2..1a1fb6208d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4758,6 +4758,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey)); } + if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index a4a02b7d65..361cd2cacc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -68,7 +68,7 @@ void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { taosFreeQitem(trigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); return; @@ -92,22 +92,22 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); return -1; } + pRunReq->head.vgId = pTask->nodeId; pRunReq->streamId = pTask->streamId; pRunReq->taskId = pTask->taskId; - SRpcMsg msg = { - .msgType = TDMT_STREAM_TASK_RUN, - .pCont = pRunReq, - .contLen = sizeof(SStreamTaskRunReq), - }; + + SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) }; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); } + return 0; } @@ -123,7 +123,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamDispatchReqToData(pReq, pData); - if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -164,7 +164,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ streamRetrieveReqToData(pReq, pData); - if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -275,7 +275,57 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) { -// // -// return 0; -// } +int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { + int8_t type = pItem->type; + + if (type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); + if (pSubmitBlock == NULL) { + qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); + terrno = TSDB_CODE_OUT_OF_MEMORY; + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); + return -1; + } + + int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; + qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, + pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, + pSubmitBlock->submit.ver, total); + + taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || + type == STREAM_INPUT__REF_DATA_BLOCK) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } else if (type == STREAM_INPUT__CHECKPOINT) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } else if (type == STREAM_INPUT__GET_RES) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } + + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { + atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + } + +#if 0 + // TODO: back pressure + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); +#endif + + return 0; +} + +void* streamQueueNextItem(SStreamQueue* queue) { + int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); + if (dequeueFlag == STREAM_QUEUE__FAILED) { + ASSERT(queue->qItem != NULL); + return streamQueueCurItem(queue); + } else { + queue->qItem = NULL; + taosGetQitem(queue->qall, &queue->qItem); + if (queue->qItem == NULL) { + taosReadAllQitems(queue->queue, queue->qall); + taosGetQitem(queue->qall, &queue->qItem); + } + return streamQueueCurItem(queue); + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 8baebaee42..3fba1cb556 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -48,10 +48,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock if (pArray == NULL) { return -1; } + taosArrayPush(pArray, &(SSDataBlock){0}); SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); blockDecode(pDataBlock, pRetrieve->data); + // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); @@ -68,32 +70,51 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); - if (pDataSubmit == NULL) return NULL; + if (pDataSubmit == NULL) { + return NULL; + } + pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); - if (pDataSubmit->dataRef == NULL) goto FAIL; + if (pDataSubmit->dataRef == NULL) { + taosFreeQitem(pDataSubmit); + return NULL; + } + pDataSubmit->submit = submit; - *pDataSubmit->dataRef = 1; + *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; + return pDataSubmit; -FAIL: - taosFreeQitem(pDataSubmit); - return NULL; +} + +void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { + int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); + ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); + + if (ref == 0) { + taosMemoryFree(pDataSubmit->submit.msgStr); + taosMemoryFree(pDataSubmit->dataRef); + } } SStreamMergedSubmit2* streamMergedSubmitNew() { SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); + if (pMerged == NULL) { + return NULL; + } - if (pMerged == NULL) return NULL; pMerged->submits = taosArrayInit(0, sizeof(SPackedData)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); - if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL; + + if (pMerged->dataRefs == NULL || pMerged->submits == NULL) { + taosArrayDestroy(pMerged->submits); + taosArrayDestroy(pMerged->dataRefs); + taosFreeQitem(pMerged); + return NULL; + } + pMerged->type = STREAM_INPUT__MERGED_SUBMIT; return pMerged; -FAIL: - if (pMerged->submits) taosArrayDestroy(pMerged->submits); - if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs); - taosFreeQitem(pMerged); - return NULL; } int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { @@ -107,26 +128,17 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) { +SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); - if (pSubmitClone == NULL) { return NULL; } + streamDataSubmitRefInc(pSubmit); memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); return pSubmitClone; } -void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) { - int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - taosMemoryFree(pDataSubmit->submit.msgStr); - taosMemoryFree(pDataSubmit->dataRef); - } -} - SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { ASSERT(elem); if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { @@ -164,7 +176,7 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitRefDec((SStreamDataSubmit2*)data); + streamDataSubmitDestroy((SStreamDataSubmit2*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 25b2656365..6ef327049c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -34,7 +34,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; - qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr, + qDebug("stream task:%d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { @@ -268,9 +268,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt); + qDebug("stream task:%d exec begin, msg batch: %d", pTask->taskId, batchCnt); streamTaskExecImpl(pTask, input, pRes); - qDebug("stream task %d exec end", pTask->taskId); + + qDebug("stream task:%d exec end", pTask->taskId); if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);