Merge pull request #20745 from taosdata/feature/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
bac65d8ab3
|
@ -223,27 +223,12 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
|
||||||
return queue->qItem;
|
return queue->qItem;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
void* streamQueueNextItem(SStreamQueue* queue);
|
||||||
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
|
||||||
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
|
||||||
ASSERT(queue->qItem != NULL);
|
|
||||||
return streamQueueCurItem(queue);
|
|
||||||
} else {
|
|
||||||
queue->qItem = NULL;
|
|
||||||
taosGetQitem(queue->qall, &queue->qItem);
|
|
||||||
if (queue->qItem == NULL) {
|
|
||||||
taosReadAllQitems(queue->queue, queue->qall);
|
|
||||||
taosGetQitem(queue->qall, &queue->qItem);
|
|
||||||
}
|
|
||||||
return streamQueueCurItem(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
|
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
|
||||||
|
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
|
||||||
|
|
||||||
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);
|
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
|
||||||
|
|
||||||
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
|
@ -371,43 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId);
|
||||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||||
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
||||||
void tFreeSStreamTask(SStreamTask* pTask);
|
void tFreeSStreamTask(SStreamTask* pTask);
|
||||||
|
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem);
|
||||||
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|
||||||
int8_t type = pItem->type;
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
|
||||||
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
|
|
||||||
if (pSubmitClone == NULL) {
|
|
||||||
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
|
|
||||||
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
|
|
||||||
// qStreamInput(pTask->exec.executor, pSubmitClone);
|
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
// qStreamInput(pTask->exec.executor, pItem);
|
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
// qStreamInput(pTask->exec.executor, pItem);
|
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
// qStreamInput(pTask->exec.executor, pItem);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
|
||||||
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
// TODO: back pressure
|
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
|
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||||
|
|
|
@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
|
||||||
|
|
||||||
typedef struct STaosQnode STaosQnode;
|
typedef struct STaosQnode STaosQnode;
|
||||||
|
|
||||||
typedef struct STaosQnode {
|
struct STaosQnode {
|
||||||
STaosQnode *next;
|
STaosQnode *next;
|
||||||
STaosQueue *queue;
|
STaosQueue *queue;
|
||||||
int64_t timestamp;
|
int64_t timestamp;
|
||||||
|
@ -70,9 +70,9 @@ typedef struct STaosQnode {
|
||||||
int8_t itype;
|
int8_t itype;
|
||||||
int8_t reserved[3];
|
int8_t reserved[3];
|
||||||
char item[];
|
char item[];
|
||||||
} STaosQnode;
|
};
|
||||||
|
|
||||||
typedef struct STaosQueue {
|
struct STaosQueue {
|
||||||
STaosQnode *head;
|
STaosQnode *head;
|
||||||
STaosQnode *tail;
|
STaosQnode *tail;
|
||||||
STaosQueue *next; // for queue set
|
STaosQueue *next; // for queue set
|
||||||
|
@ -84,22 +84,22 @@ typedef struct STaosQueue {
|
||||||
int64_t memOfItems;
|
int64_t memOfItems;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
int64_t threadId;
|
int64_t threadId;
|
||||||
} STaosQueue;
|
};
|
||||||
|
|
||||||
typedef struct STaosQset {
|
struct STaosQset {
|
||||||
STaosQueue *head;
|
STaosQueue *head;
|
||||||
STaosQueue *current;
|
STaosQueue *current;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
tsem_t sem;
|
tsem_t sem;
|
||||||
int32_t numOfQueues;
|
int32_t numOfQueues;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
} STaosQset;
|
};
|
||||||
|
|
||||||
typedef struct STaosQall {
|
struct STaosQall {
|
||||||
STaosQnode *current;
|
STaosQnode *current;
|
||||||
STaosQnode *start;
|
STaosQnode *start;
|
||||||
int32_t numOfItems;
|
int32_t numOfItems;
|
||||||
} STaosQall;
|
};
|
||||||
|
|
||||||
STaosQueue *taosOpenQueue();
|
STaosQueue *taosOpenQueue();
|
||||||
void taosCloseQueue(STaosQueue *queue);
|
void taosCloseQueue(STaosQueue *queue);
|
||||||
|
|
|
@ -107,7 +107,6 @@ struct tmq_t {
|
||||||
STaosQueue* mqueue; // queue of rsp
|
STaosQueue* mqueue; // queue of rsp
|
||||||
STaosQall* qall;
|
STaosQall* qall;
|
||||||
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
|
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
|
||||||
TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets.
|
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -188,7 +187,6 @@ typedef struct {
|
||||||
SMqClientVg* pVg;
|
SMqClientVg* pVg;
|
||||||
SMqClientTopic* pTopic;
|
SMqClientTopic* pTopic;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
tsem_t rspSem;
|
|
||||||
uint64_t requestId; // request id for debug purpose
|
uint64_t requestId; // request id for debug purpose
|
||||||
} SMqPollCbParam;
|
} SMqPollCbParam;
|
||||||
|
|
||||||
|
@ -979,7 +977,6 @@ void tmqFreeImpl(void* handle) {
|
||||||
|
|
||||||
taosFreeQall(tmq->qall);
|
taosFreeQall(tmq->qall);
|
||||||
tsem_destroy(&tmq->rspSem);
|
tsem_destroy(&tmq->rspSem);
|
||||||
taosThreadMutexDestroy(&tmq->lock);
|
|
||||||
|
|
||||||
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
|
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
|
||||||
taos_close_internal(tmq->pTscObj);
|
taos_close_internal(tmq->pTscObj);
|
||||||
|
@ -1024,7 +1021,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->delayedTask = taosOpenQueue();
|
pTmq->delayedTask = taosOpenQueue();
|
||||||
pTmq->qall = taosAllocateQall();
|
pTmq->qall = taosAllocateQall();
|
||||||
|
|
||||||
taosThreadMutexInit(&pTmq->lock, NULL);
|
|
||||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
|
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
|
||||||
conf->groupId[0] == 0) {
|
conf->groupId[0] == 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1233,7 +1229,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||||
if (tmq == NULL) {
|
if (tmq == NULL) {
|
||||||
tsem_destroy(&pParam->rspSem);
|
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
|
@ -1343,8 +1338,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
|
|
||||||
|
int32_t total = taosQueueItemSize(tmq->mqueue);
|
||||||
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId);
|
tmq->consumerId, rspType, vgId, total, requestId);
|
||||||
|
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
|
@ -1419,7 +1415,7 @@ static void freeClientVgInfo(void* param) {
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
bool set = false;
|
bool set = false;
|
||||||
|
|
||||||
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
@ -1471,14 +1467,11 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
|
|
||||||
taosHashCleanup(pVgOffsetHashMap);
|
taosHashCleanup(pVgOffsetHashMap);
|
||||||
|
|
||||||
taosThreadMutexLock(&tmq->lock);
|
|
||||||
// destroy current buffered existed topics info
|
// destroy current buffered existed topics info
|
||||||
if (tmq->clientTopics) {
|
if (tmq->clientTopics) {
|
||||||
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
|
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq->clientTopics = newTopics;
|
tmq->clientTopics = newTopics;
|
||||||
taosThreadMutexUnlock(&tmq->lock);
|
|
||||||
|
|
||||||
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
||||||
atomic_store_8(&tmq->status, flag);
|
atomic_store_8(&tmq->status, flag);
|
||||||
|
@ -1742,7 +1735,7 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
|
||||||
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
||||||
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||||
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
||||||
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
|
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
|
||||||
/*tmqClearUnhandleMsg(tmq);*/
|
/*tmqClearUnhandleMsg(tmq);*/
|
||||||
tDeleteSMqAskEpRsp(rspMsg);
|
tDeleteSMqAskEpRsp(rspMsg);
|
||||||
*pReset = true;
|
*pReset = true;
|
||||||
|
@ -2163,7 +2156,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par
|
||||||
|
|
||||||
SMqAskEpRsp rsp;
|
SMqAskEpRsp rsp;
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
tmqUpdateEp(pTmq, head->epoch, &rsp);
|
doUpdateLocalEp(pTmq, head->epoch, &rsp);
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,10 +161,10 @@ void *queryThread(void *arg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t numOfThreads = 1;
|
int32_t numOfThreads = 1;
|
||||||
|
|
||||||
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
|
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
|
||||||
printf("success, code:%d\n", code);
|
printf("auto commit success, code:%d\n\n\n\n", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doConsumeData(void* param) {
|
void* doConsumeData(void* param) {
|
||||||
|
@ -173,7 +173,7 @@ void* doConsumeData(void* param) {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName12");
|
tmq_conf_set(conf, "group.id", "cgrpName41");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
@ -1060,7 +1060,7 @@ TEST(clientCase, sub_tb_test) {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName27");
|
tmq_conf_set(conf, "group.id", "cgrpName45");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
|
|
@ -819,13 +819,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->pRef = pRef;
|
pHandle->pRef = pRef;
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.meta = pVnode->pMeta,
|
.meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
|
||||||
.vnode = pVnode,
|
|
||||||
.initTableReader = true,
|
|
||||||
.initTqReader = true,
|
|
||||||
.version = ver,
|
|
||||||
};
|
|
||||||
|
|
||||||
pHandle->snapshotVer = ver;
|
pHandle->snapshotVer = ver;
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -1338,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
pRefBlock->dataRef = pRef;
|
pRefBlock->dataRef = pRef;
|
||||||
atomic_add_fetch_32(pRefBlock->dataRef, 1);
|
atomic_add_fetch_32(pRefBlock->dataRef, 1);
|
||||||
|
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
||||||
qError("stream task input del failed, task id %d", pTask->taskId);
|
qError("stream task input del failed, task id %d", pTask->taskId);
|
||||||
|
|
||||||
atomic_sub_fetch_32(pRef, 1);
|
atomic_sub_fetch_32(pRef, 1);
|
||||||
|
@ -1373,7 +1367,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
taosArrayPush(pStreamBlock->blocks, &block);
|
taosArrayPush(pStreamBlock->blocks, &block);
|
||||||
|
|
||||||
if (!failed) {
|
if (!failed) {
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
|
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
|
||||||
qError("stream task input del failed, task id %d", pTask->taskId);
|
qError("stream task input del failed, task id %d", pTask->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1393,15 +1387,14 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
bool failed = false;
|
bool succ = true;
|
||||||
SStreamDataSubmit2* pSubmit = NULL;
|
|
||||||
|
|
||||||
pSubmit = streamDataSubmitNew(submit);
|
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit);
|
||||||
if (pSubmit == NULL) {
|
if (pSubmit == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("failed to create data submit for stream since out of memory");
|
tqError("failed to create data submit for stream since out of memory");
|
||||||
failed = true;
|
succ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1411,22 +1404,27 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
|
||||||
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver);
|
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
|
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (!failed) {
|
tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver);
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
if (succ) {
|
||||||
tqError("stream task input failed, task id %d", pTask->taskId);
|
int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit);
|
||||||
|
if (code < 0) {
|
||||||
|
// let's handle the back pressure
|
||||||
|
|
||||||
|
tqError("stream task:%d failed to put into queue for, too many", pTask->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
if (streamSchedExec(pTask) < 0) {
|
||||||
tqError("stream task launch failed, task id %d", pTask->taskId);
|
tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1434,12 +1432,12 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSubmit) {
|
if (pSubmit != NULL) {
|
||||||
streamDataSubmitRefDec(pSubmit);
|
streamDataSubmitDestroy(pSubmit);
|
||||||
taosFreeQitem(pSubmit);
|
taosFreeQitem(pSubmit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return failed ? -1 : 0;
|
return succ ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -30,7 +30,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
|
||||||
// update processed
|
// update processed
|
||||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
||||||
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
||||||
streamDataSubmitRefDec(pSubmit);
|
streamDataSubmitDestroy(pSubmit);
|
||||||
if (pRsp->blockNum > 0) {
|
if (pRsp->blockNum > 0) {
|
||||||
*ppSubmit = pSubmit;
|
*ppSubmit = pSubmit;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -58,7 +58,7 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
|
||||||
}
|
}
|
||||||
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
|
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
|
||||||
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
|
||||||
streamDataSubmitRefDec(pSubmit);
|
streamDataSubmitDestroy(pSubmit);
|
||||||
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
|
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
|
||||||
if (pSubmit == NULL) break;
|
if (pSubmit == NULL) break;
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHan
|
||||||
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
|
||||||
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
|
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
|
||||||
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
||||||
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone(pSubmit);
|
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
|
||||||
if (pSubmitClone == NULL) {
|
if (pSubmitClone == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -207,33 +207,18 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void* pKey;
|
void* pKey;
|
||||||
int64_t keyLen;
|
int64_t keyLen;
|
||||||
} SItem;
|
} SItem;
|
||||||
|
|
||||||
static void recordPushedEntry(SArray* cachedKey, void* pIter);
|
static void recordPushedEntry(SArray* cachedKey, void* pIter);
|
||||||
|
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
|
||||||
|
|
||||||
static void freeItem(void* param) {
|
static void freeItem(void* param) {
|
||||||
SItem* p = (SItem*) param;
|
SItem* p = (SItem*) param;
|
||||||
taosMemoryFree(p->pKey);
|
taosMemoryFree(p->pKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
|
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfKeys; i++) {
|
|
||||||
SItem* pItem = taosArrayGet(pCachedKeys, i);
|
|
||||||
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
|
|
||||||
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfKeys > 0) {
|
|
||||||
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
|
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
|
||||||
int32_t dataLen, SArray* pCachedKey) {
|
int32_t dataLen, SArray* pCachedKey) {
|
||||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||||
|
@ -347,7 +332,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
void* data = taosMemoryMalloc(len);
|
void* data = taosMemoryMalloc(len);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("failed to copy data for stream since out of memory");
|
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,13 +351,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void recordPushedEntry(SArray* cachedKey, void* pIter) {
|
|
||||||
size_t kLen = 0;
|
|
||||||
void* key = taosHashGetKey(pIter, &kLen);
|
|
||||||
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
|
|
||||||
taosArrayPush(cachedKey, &item);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||||
SMqDataRsp* pDataRsp, int32_t type) {
|
SMqDataRsp* pDataRsp, int32_t type) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
@ -430,3 +408,26 @@ int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void recordPushedEntry(SArray* cachedKey, void* pIter) {
|
||||||
|
size_t kLen = 0;
|
||||||
|
void* key = taosHashGetKey(pIter, &kLen);
|
||||||
|
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
|
||||||
|
taosArrayPush(cachedKey, &item);
|
||||||
|
}
|
||||||
|
|
||||||
|
void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfKeys; i++) {
|
||||||
|
SItem* pItem = taosArrayGet(pCachedKeys, i);
|
||||||
|
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
|
||||||
|
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfKeys > 0) {
|
||||||
|
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -541,6 +541,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return vnodeGetBatchMeta(pVnode, pMsg);
|
return vnodeGetBatchMeta(pVnode, pMsg);
|
||||||
case TDMT_VND_TMQ_CONSUME:
|
case TDMT_VND_TMQ_CONSUME:
|
||||||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||||
|
|
||||||
case TDMT_STREAM_TASK_RUN:
|
case TDMT_STREAM_TASK_RUN:
|
||||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
#if 1
|
#if 1
|
||||||
|
|
|
@ -502,32 +502,12 @@ typedef struct STableCountScanSupp {
|
||||||
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
||||||
} STableCountScanSupp;
|
} STableCountScanSupp;
|
||||||
|
|
||||||
typedef struct STableCountScanOperatorInfo {
|
|
||||||
SReadHandle readHandle;
|
|
||||||
SSDataBlock* pRes;
|
|
||||||
|
|
||||||
STableCountScanSupp supp;
|
|
||||||
|
|
||||||
int32_t currGrpIdx;
|
|
||||||
SArray* stbUidList; // when group by db_name and/or stable_name
|
|
||||||
} STableCountScanOperatorInfo;
|
|
||||||
|
|
||||||
typedef struct SOptrBasicInfo {
|
typedef struct SOptrBasicInfo {
|
||||||
SResultRowInfo resultRowInfo;
|
SResultRowInfo resultRowInfo;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
bool mergeResultBlock;
|
bool mergeResultBlock;
|
||||||
} SOptrBasicInfo;
|
} SOptrBasicInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
|
||||||
SOptrBasicInfo binfo;
|
|
||||||
SAggSupporter aggSup;
|
|
||||||
STableQueryInfo* current;
|
|
||||||
uint64_t groupId;
|
|
||||||
SGroupResInfo groupResInfo;
|
|
||||||
SExprSupp scalarExprSup;
|
|
||||||
bool groupKeyOptimized;
|
|
||||||
} SAggOperatorInfo;
|
|
||||||
|
|
||||||
typedef struct SIntervalAggOperatorInfo {
|
typedef struct SIntervalAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo; // basic info
|
SOptrBasicInfo binfo; // basic info
|
||||||
SAggSupporter aggSup; // aggregate supporter
|
SAggSupporter aggSup; // aggregate supporter
|
||||||
|
|
|
@ -40,6 +40,16 @@ typedef struct {
|
||||||
int32_t startOffset;
|
int32_t startOffset;
|
||||||
} SFunctionCtxStatus;
|
} SFunctionCtxStatus;
|
||||||
|
|
||||||
|
typedef struct SAggOperatorInfo {
|
||||||
|
SOptrBasicInfo binfo;
|
||||||
|
SAggSupporter aggSup;
|
||||||
|
STableQueryInfo* current;
|
||||||
|
uint64_t groupId;
|
||||||
|
SGroupResInfo groupResInfo;
|
||||||
|
SExprSupp scalarExprSup;
|
||||||
|
bool groupKeyOptimized;
|
||||||
|
} SAggOperatorInfo;
|
||||||
|
|
||||||
static void destroyAggOperatorInfo(void* param);
|
static void destroyAggOperatorInfo(void* param);
|
||||||
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||||
|
|
||||||
|
|
|
@ -127,12 +127,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
|
qDebug("task stream set total blocks:%d %s", (int32_t)numOfBlocks, id);
|
||||||
ASSERT(pInfo->validBlockIndex == 0);
|
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||||
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
|
||||||
|
|
||||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
// ASSERT(numOfBlocks > 1);
|
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||||
taosArrayPush(pInfo->pBlockLists, pReq);
|
taosArrayPush(pInfo->pBlockLists, pReq);
|
||||||
|
|
|
@ -33,7 +33,6 @@
|
||||||
|
|
||||||
int32_t scanDebug = 0;
|
int32_t scanDebug = 0;
|
||||||
|
|
||||||
|
|
||||||
#define MULTI_READER_MAX_TABLE_NUM 5000
|
#define MULTI_READER_MAX_TABLE_NUM 5000
|
||||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
@ -52,6 +51,16 @@ typedef struct STableMergeScanSortSourceParam {
|
||||||
STsdbReader* dataReader;
|
STsdbReader* dataReader;
|
||||||
} STableMergeScanSortSourceParam;
|
} STableMergeScanSortSourceParam;
|
||||||
|
|
||||||
|
typedef struct STableCountScanOperatorInfo {
|
||||||
|
SReadHandle readHandle;
|
||||||
|
SSDataBlock* pRes;
|
||||||
|
|
||||||
|
STableCountScanSupp supp;
|
||||||
|
|
||||||
|
int32_t currGrpIdx;
|
||||||
|
SArray* stbUidList; // when group by db_name and/or stable_name
|
||||||
|
} STableCountScanOperatorInfo;
|
||||||
|
|
||||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||||
|
|
||||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||||
|
|
|
@ -4758,6 +4758,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
|
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pInfo->pUpdatedMap) {
|
if (!pInfo->pUpdatedMap) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
|
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
|
||||||
|
|
|
@ -68,7 +68,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
|
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) {
|
if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) {
|
||||||
taosFreeQitem(trigger);
|
taosFreeQitem(trigger);
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||||
return;
|
return;
|
||||||
|
@ -92,22 +92,22 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
|
||||||
int32_t streamSchedExec(SStreamTask* pTask) {
|
int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
int8_t schedStatus =
|
int8_t schedStatus =
|
||||||
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
|
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRunReq->head.vgId = pTask->nodeId;
|
pRunReq->head.vgId = pTask->nodeId;
|
||||||
pRunReq->streamId = pTask->streamId;
|
pRunReq->streamId = pTask->streamId;
|
||||||
pRunReq->taskId = pTask->taskId;
|
pRunReq->taskId = pTask->taskId;
|
||||||
SRpcMsg msg = {
|
|
||||||
.msgType = TDMT_STREAM_TASK_RUN,
|
SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) };
|
||||||
.pCont = pRunReq,
|
|
||||||
.contLen = sizeof(SStreamTaskRunReq),
|
|
||||||
};
|
|
||||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
|
||||||
/*pData->blocks = pReq->data;*/
|
/*pData->blocks = pReq->data;*/
|
||||||
/*pBlock->sourceVer = pReq->sourceVer;*/
|
/*pBlock->sourceVer = pReq->sourceVer;*/
|
||||||
streamDispatchReqToData(pReq, pData);
|
streamDispatchReqToData(pReq, pData);
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
|
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||||
status = TASK_INPUT_STATUS__NORMAL;
|
status = TASK_INPUT_STATUS__NORMAL;
|
||||||
} else {
|
} else {
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
|
@ -164,7 +164,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
/*pData->blocks = pReq->data;*/
|
/*pData->blocks = pReq->data;*/
|
||||||
/*pBlock->sourceVer = pReq->sourceVer;*/
|
/*pBlock->sourceVer = pReq->sourceVer;*/
|
||||||
streamRetrieveReqToData(pReq, pData);
|
streamRetrieveReqToData(pReq, pData);
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
|
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||||
status = TASK_INPUT_STATUS__NORMAL;
|
status = TASK_INPUT_STATUS__NORMAL;
|
||||||
} else {
|
} else {
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
|
@ -275,7 +275,57 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
|
int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
// //
|
int8_t type = pItem->type;
|
||||||
// return 0;
|
|
||||||
// }
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
|
||||||
|
if (pSubmitBlock == NULL) {
|
||||||
|
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
|
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
|
||||||
|
pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
|
||||||
|
pSubmitBlock->submit.ver, total);
|
||||||
|
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
|
||||||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
||||||
|
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// TODO: back pressure
|
||||||
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* streamQueueNextItem(SStreamQueue* queue) {
|
||||||
|
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
||||||
|
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
||||||
|
ASSERT(queue->qItem != NULL);
|
||||||
|
return streamQueueCurItem(queue);
|
||||||
|
} else {
|
||||||
|
queue->qItem = NULL;
|
||||||
|
taosGetQitem(queue->qall, &queue->qItem);
|
||||||
|
if (queue->qItem == NULL) {
|
||||||
|
taosReadAllQitems(queue->queue, queue->qall);
|
||||||
|
taosGetQitem(queue->qall, &queue->qItem);
|
||||||
|
}
|
||||||
|
return streamQueueCurItem(queue);
|
||||||
|
}
|
||||||
|
}
|
|
@ -48,10 +48,12 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pArray, &(SSDataBlock){0});
|
taosArrayPush(pArray, &(SSDataBlock){0});
|
||||||
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
|
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
|
||||||
blockDecode(pDataBlock, pRetrieve->data);
|
blockDecode(pDataBlock, pRetrieve->data);
|
||||||
|
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||||
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||||
|
@ -68,32 +70,51 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
|
||||||
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
|
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) {
|
||||||
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
|
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
|
||||||
|
|
||||||
if (pDataSubmit == NULL) return NULL;
|
if (pDataSubmit == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
||||||
if (pDataSubmit->dataRef == NULL) goto FAIL;
|
if (pDataSubmit->dataRef == NULL) {
|
||||||
|
taosFreeQitem(pDataSubmit);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pDataSubmit->submit = submit;
|
pDataSubmit->submit = submit;
|
||||||
*pDataSubmit->dataRef = 1;
|
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
|
||||||
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
|
pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
|
||||||
|
|
||||||
return pDataSubmit;
|
return pDataSubmit;
|
||||||
FAIL:
|
}
|
||||||
taosFreeQitem(pDataSubmit);
|
|
||||||
return NULL;
|
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
|
||||||
|
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
|
||||||
|
if (ref == 0) {
|
||||||
|
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||||
|
taosMemoryFree(pDataSubmit->dataRef);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamMergedSubmit2* streamMergedSubmitNew() {
|
SStreamMergedSubmit2* streamMergedSubmitNew() {
|
||||||
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0);
|
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0);
|
||||||
|
if (pMerged == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMerged == NULL) return NULL;
|
|
||||||
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
|
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
|
||||||
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
|
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
|
||||||
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) goto FAIL;
|
|
||||||
|
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
|
||||||
|
taosArrayDestroy(pMerged->submits);
|
||||||
|
taosArrayDestroy(pMerged->dataRefs);
|
||||||
|
taosFreeQitem(pMerged);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
|
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
|
||||||
return pMerged;
|
return pMerged;
|
||||||
FAIL:
|
|
||||||
if (pMerged->submits) taosArrayDestroy(pMerged->submits);
|
|
||||||
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
|
|
||||||
taosFreeQitem(pMerged);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
|
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
|
||||||
|
@ -107,26 +128,17 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit)
|
||||||
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
|
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit) {
|
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
|
||||||
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
|
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0);
|
||||||
|
|
||||||
if (pSubmitClone == NULL) {
|
if (pSubmitClone == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDataSubmitRefInc(pSubmit);
|
streamDataSubmitRefInc(pSubmit);
|
||||||
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
|
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
|
||||||
return pSubmitClone;
|
return pSubmitClone;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit) {
|
|
||||||
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
|
|
||||||
ASSERT(ref >= 0);
|
|
||||||
if (ref == 0) {
|
|
||||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
|
||||||
taosMemoryFree(pDataSubmit->dataRef);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
|
||||||
ASSERT(elem);
|
ASSERT(elem);
|
||||||
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
|
if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
@ -164,7 +176,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
streamDataSubmitRefDec((SStreamDataSubmit2*)data);
|
streamDataSubmitDestroy((SStreamDataSubmit2*)data);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
|
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
|
||||||
|
|
|
@ -34,7 +34,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
||||||
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
|
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
|
||||||
qDebug("task %d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
|
qDebug("stream task:%d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr,
|
||||||
pSubmit->submit.msgLen, pSubmit->submit.ver);
|
pSubmit->submit.msgLen, pSubmit->submit.ver);
|
||||||
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
|
@ -268,9 +268,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
|
||||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
|
qDebug("stream task:%d exec begin, msg batch: %d", pTask->taskId, batchCnt);
|
||||||
streamTaskExecImpl(pTask, input, pRes);
|
streamTaskExecImpl(pTask, input, pRes);
|
||||||
qDebug("stream task %d exec end", pTask->taskId);
|
|
||||||
|
qDebug("stream task:%d exec end", pTask->taskId);
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
if (taosArrayGetSize(pRes) != 0) {
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||||
|
|
Loading…
Reference in New Issue