Merge remote-tracking branch 'origin/feature/stream' into feature/stream
This commit is contained in:
commit
347498235d
|
@ -494,7 +494,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
|||
tmqAskEp(tmq, true);
|
||||
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||
/*tmq_commit(tmq, NULL, true);*/
|
||||
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||
|
@ -667,94 +666,6 @@ FAIL:
|
|||
|
||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
||||
#if 0
|
||||
// TODO: add read write lock
|
||||
SRequestObj* pRequest = NULL;
|
||||
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
|
||||
// build msg
|
||||
// send to mnode
|
||||
SMqCMCommitOffsetReq req;
|
||||
SArray* pOffsets = NULL;
|
||||
|
||||
if (offsets == NULL) {
|
||||
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
SMqOffset offset;
|
||||
strcpy(offset.topicName, pTopic->topicName);
|
||||
strcpy(offset.cgroup, tmq->groupId);
|
||||
offset.vgId = pVg->vgId;
|
||||
offset.offset = pVg->currentOffset;
|
||||
taosArrayPush(pOffsets, &offset);
|
||||
}
|
||||
}
|
||||
req.num = pOffsets->size;
|
||||
req.offsets = pOffsets->pData;
|
||||
} else {
|
||||
req.num = taosArrayGetSize(&offsets->container);
|
||||
req.offsets = (SMqOffset*)offsets->container.pData;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
||||
int32_t tlen = encoder.pos;
|
||||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
tEncoderClear(&encoder);
|
||||
return -1;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc request");
|
||||
}
|
||||
|
||||
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
||||
if (pParam == NULL) {
|
||||
return -1;
|
||||
}
|
||||
pParam->tmq = tmq;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
pParam->async = async;
|
||||
pParam->offsets = pOffsets;
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = buf,
|
||||
.len = tlen,
|
||||
.handle = NULL,
|
||||
};
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqCommitCb;
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
if (!async) {
|
||||
tsem_wait(&pParam->rspSem);
|
||||
resp = pParam->rspErr;
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
taosMemoryFree(pParam);
|
||||
|
||||
if (pOffsets) {
|
||||
taosArrayDestroy(pOffsets);
|
||||
}
|
||||
}
|
||||
|
||||
return resp;
|
||||
#endif
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||
|
@ -859,93 +770,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
|
|||
conf->commitCbUserParam = param;
|
||||
}
|
||||
|
||||
#if 0
|
||||
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
||||
STscObj* pTscObj = (STscObj*)taos;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQueryNode = NULL;
|
||||
char* astStr = NULL;
|
||||
int32_t sqlLen;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
if (taos == NULL || streamName == NULL || sql == NULL) {
|
||||
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto _return;
|
||||
}
|
||||
sqlLen = strlen(sql);
|
||||
|
||||
if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
|
||||
tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
tscDebug("start to create stream: %s", streamName);
|
||||
|
||||
int32_t code = 0;
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
|
||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
|
||||
|
||||
/*printf("%s\n", pStr);*/
|
||||
|
||||
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
|
||||
strcpy(name.dbname, pRequest->pDb);
|
||||
strcpy(name.tname, streamName);
|
||||
|
||||
SCMCreateStreamReq req = {
|
||||
.igExists = 1,
|
||||
.ast = astStr,
|
||||
.sql = (char*)sql,
|
||||
};
|
||||
tNameExtractFullName(&name, req.name);
|
||||
strcpy(req.targetStbFullName, tbName);
|
||||
|
||||
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
||||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
tSerializeSCMCreateStreamReq(buf, tlen, &req);
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = buf,
|
||||
.len = tlen,
|
||||
.handle = NULL,
|
||||
};
|
||||
pRequest->type = TDMT_MND_CREATE_STREAM;
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
_return:
|
||||
taosMemoryFreeClear(astStr);
|
||||
qDestroyQuery(pQueryNode);
|
||||
/*if (sendInfo != NULL) {*/
|
||||
/*destroySendMsgInfo(sendInfo);*/
|
||||
/*}*/
|
||||
|
||||
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = terrno;
|
||||
}
|
||||
|
||||
return pRequest;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return 0;
|
||||
|
@ -1540,10 +1364,11 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
|||
}
|
||||
return NULL;
|
||||
}
|
||||
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
||||
|
||||
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
||||
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
|
||||
}
|
||||
|
||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
|
||||
}
|
||||
|
|
|
@ -78,7 +78,18 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
// 2.1. idle: exec
|
||||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
streamExec(pTask, pMsgCb);
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
streamExec(pTask, pMsgCb);
|
||||
} else {
|
||||
ASSERT(pTask->sinkType != TASK_SINK__NONE);
|
||||
while (1) {
|
||||
void* data = streamQueueNextItem(pTask->inputQueue);
|
||||
if (data == NULL) return 0;
|
||||
if (streamTaskOutput(pTask, data) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. handle output
|
||||
// 3.1 check and set status
|
||||
|
|
|
@ -76,7 +76,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
|||
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
||||
taosFreeQitem(data);
|
||||
} else {
|
||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
||||
/*taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);*/
|
||||
taosFreeQitem(data);
|
||||
}
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
|
|
Loading…
Reference in New Issue