multiple level stream schedule
This commit is contained in:
parent
c83a07277f
commit
e0f192046c
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
|
|||
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
|
||||
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
|
||||
|
||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||
|
@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
|||
|
||||
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
|
||||
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
|
||||
DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
|
||||
DLL_EXPORT void *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
|
||||
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
|
||||
|
||||
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
|
||||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
|
||||
|
|
|
@ -62,10 +62,11 @@ typedef struct {
|
|||
} STaskExec;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
int32_t taskId;
|
||||
} STaskDispatcherInplace;
|
||||
|
||||
typedef struct {
|
||||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
SEpSet epSet;
|
||||
} STaskDispatcherFixedEp;
|
||||
|
|
|
@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "connection.ip") == 0) {
|
||||
if (strcmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
if (strcmp(key, "connection.user") == 0) {
|
||||
if (strcmp(key, "td.connect.user") == 0) {
|
||||
conf->user = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
if (strcmp(key, "connection.pass") == 0) {
|
||||
if (strcmp(key, "td.connect.pass") == 0) {
|
||||
conf->pass = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
if (strcmp(key, "connection.port") == 0) {
|
||||
if (strcmp(key, "td.connect.port") == 0) {
|
||||
conf->port = atoi(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
if (strcmp(key, "connection.db") == 0) {
|
||||
if (strcmp(key, "td.connect.db") == 0) {
|
||||
conf->db = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
|||
}
|
||||
|
||||
void tmq_list_destroy(tmq_list_t* list) {
|
||||
SArray* container = (SArray*)list;
|
||||
SArray* container = &list->container;
|
||||
/*taosArrayDestroy(container);*/
|
||||
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
|
||||
}
|
||||
|
||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||
tmq_message_t* msg;
|
||||
tmq_message_t* msg = NULL;
|
||||
while (1) {
|
||||
taosGetQitem(tmq->qall, (void**)&msg);
|
||||
if (msg)
|
||||
|
@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
SMqClientVg* pVg = pParam->pVg;
|
||||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("msg discard %x\n", code);
|
||||
printf("msg discard, code:%x\n", code);
|
||||
goto WRITE_QUEUE_FAIL;
|
||||
}
|
||||
|
||||
|
@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL:
|
|||
}
|
||||
|
||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||
printf("call update ep %d\n", epoch);
|
||||
bool set = false;
|
||||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||||
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
|
||||
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||
|
@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
|||
taosArrayPush(topic.vgs, &clientVg);
|
||||
set = true;
|
||||
}
|
||||
taosArrayPush(tmq->clientTopics, &topic);
|
||||
taosArrayPush(newTopics, &topic);
|
||||
}
|
||||
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
|
||||
tmq->clientTopics = newTopics;
|
||||
atomic_store_32(&tmq->epoch, epoch);
|
||||
return set;
|
||||
}
|
||||
|
@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
|
||||
/*printf("epoch match\n");*/
|
||||
SMqClientVg* pVg = rspMsg->vg;
|
||||
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
|
||||
pVg->currentOffset = rspMsg->msg.rspOffset;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
return rspMsg;
|
||||
|
|
|
@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SNodeMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
||||
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
|
||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
|
@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
|||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
|
||||
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeMsg);
|
||||
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
|
|
|
@ -185,6 +185,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
|
||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||
|
||||
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
|
||||
}
|
||||
|
|
|
@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
|
|||
// TODO: error code of buffer pool
|
||||
}
|
||||
#endif
|
||||
pTq->tqMeta =
|
||||
tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0);
|
||||
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
|
||||
(FTqDelete)taosMemoryFree, 0);
|
||||
if (pTq->tqMeta == NULL) {
|
||||
taosMemoryFree(pTq);
|
||||
#if 0
|
||||
|
@ -498,12 +498,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
|
||||
SStreamTaskExecReq* pReq = msg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead));
|
||||
|
||||
SStreamTaskExecReq req;
|
||||
tDecodeSStreamTaskExecReq(msgstr, &req);
|
||||
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
ASSERT(pTask);
|
||||
|
||||
if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
|
||||
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
|
||||
// TODO
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -66,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
case TDMT_VND_CONSUME:
|
||||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TASK_EXEC:
|
||||
case TDMT_VND_TASK_PIPE_EXEC:
|
||||
case TDMT_VND_TASK_MERGE_EXEC:
|
||||
return tqProcessTaskExec(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_TRIGGER:
|
||||
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
|
||||
|
|
|
@ -121,7 +121,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SStreamTaskExecReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
.taskId = pTask->taskId,
|
||||
.taskId = pTask->fixedEpDispatcher.taskId,
|
||||
.data = pRes,
|
||||
};
|
||||
|
||||
|
@ -211,8 +211,9 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
|
@ -248,8 +249,9 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
|
|
|
@ -19,13 +19,13 @@
|
|||
#include "tref.h"
|
||||
#include "walInt.h"
|
||||
|
||||
int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
|
||||
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
|
||||
|
||||
int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
|
||||
int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
|
||||
|
||||
int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
|
||||
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
|
||||
|
||||
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||
ASSERT(pWal->fileInfoSet != NULL);
|
||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
ASSERT(sz > 0);
|
||||
|
|
|
@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) {
|
|||
}
|
||||
|
||||
int walChangeWrite(SWal* pWal, int64_t ver) {
|
||||
int code = 0;
|
||||
int code;
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
if (pWal->pWriteLogTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
||||
if (code != 0) {
|
||||
|
@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
if (ver < pWal->vers.snapshotVer) {
|
||||
|
||||
}
|
||||
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
|
||||
code = walChangeWrite(pWal, ver);
|
||||
|
|
|
@ -314,7 +314,7 @@ int32_t init_env() {
|
|||
}
|
||||
|
||||
//const char* sql = "select * from tu1";
|
||||
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s0", g_stConfInfo.stbName);
|
||||
sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
|
||||
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
|
||||
pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
|
@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() {
|
|||
return topic_list;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t cnt = 0;
|
||||
/*clock_t startTime = clock();*/
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
/*} else {*/
|
||||
/*break;*/
|
||||
}
|
||||
}
|
||||
/*clock_t endTime = clock();*/
|
||||
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
static const int MIN_COMMIT_COUNT = 1000;
|
||||
|
||||
|
@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
|
|||
|
||||
if (batchCnt != totalMsgs) {
|
||||
printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
|
||||
exit(-1);
|
||||
/*exit(-1);*/
|
||||
}
|
||||
|
||||
if (0 == g_stConfInfo.simCase) {
|
||||
|
@ -693,8 +663,8 @@ int main(int32_t argc, char *argv[]) {
|
|||
|
||||
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
|
||||
if (walLogSize <= 0) {
|
||||
printf("vnode2/wal size incorrect!");
|
||||
/*exit(-1);*/
|
||||
printf("vnode2/wal size incorrect!\n");
|
||||
/*exit(-1);*/
|
||||
} else {
|
||||
if (0 == g_stConfInfo.simCase) {
|
||||
pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
|
||||
|
|
Loading…
Reference in New Issue