diff --git a/include/client/taos.h b/include/client/taos.h index e9f7f88387..61538e392a 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -209,15 +209,6 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi /* --------------------------TMQ INTERFACE------------------------------- */ -#if 0 -enum { - TMQ_RESP_ERR__FAIL = -1, - TMQ_RESP_ERR__SUCCESS = 0, -}; - -typedef int32_t tmq_resp_err_t; -#endif - typedef struct tmq_t tmq_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; @@ -236,15 +227,13 @@ DLL_EXPORT const char *tmq_err2str(int32_t code); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ -DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); -DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); -DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); -// timeout: -1 means infinitely waiting +DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); +DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); - -DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); -DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); +DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 712b4fcf42..3c3d3e953d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1713,6 +1713,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks } char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { + ASSERT(stbName[0] != 0); SArray* tags = taosArrayInit(0, sizeof(void*)); SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv)); pTag->key = "group_id"; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 6f8fc748c2..39bb6798aa 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -105,7 +105,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet int32_t size = encoder.pos; int32_t tlen = sizeof(SMsgHead) + size; tEncoderClear(&encoder); - void* buf = taosMemoryMalloc(tlen); + void* buf = taosMemoryCalloc(1, tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -157,6 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* } sdbRelease(pMnode->pSdb, pDb); + memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t sz = taosArrayGetSize(pVgs); SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); @@ -166,6 +167,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* for (int32_t j = 0; j < sinkLvSize; j++) { SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); if (pLastLevelTask->nodeId == pVgInfo->vgId) { + ASSERT(pVgInfo->vgId > 0); pVgInfo->taskId = pLastLevelTask->taskId; ASSERT(pVgInfo->taskId != 0); break; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 59ec2b5ceb..ca10e7d956 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -134,6 +134,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM int32_t sz = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < sz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { vgId = pVgInfo->vgId; downstreamTaskId = pVgInfo->taskId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7a7d9d15ad..a35e7679a1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -70,7 +70,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; - /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ + if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; @@ -119,8 +119,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;