fix(stream): build ctb name
This commit is contained in:
parent
73e4dd9543
commit
342a1ae42d
|
@ -209,15 +209,6 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
|
||||||
|
|
||||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||||
|
|
||||||
#if 0
|
|
||||||
enum {
|
|
||||||
TMQ_RESP_ERR__FAIL = -1,
|
|
||||||
TMQ_RESP_ERR__SUCCESS = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef int32_t tmq_resp_err_t;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct tmq_t tmq_t;
|
typedef struct tmq_t tmq_t;
|
||||||
typedef struct tmq_conf_t tmq_conf_t;
|
typedef struct tmq_conf_t tmq_conf_t;
|
||||||
typedef struct tmq_list_t tmq_list_t;
|
typedef struct tmq_list_t tmq_list_t;
|
||||||
|
@ -236,15 +227,13 @@ DLL_EXPORT const char *tmq_err2str(int32_t code);
|
||||||
|
|
||||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||||
|
|
||||||
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
|
||||||
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
|
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
|
||||||
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||||
// timeout: -1 means infinitely waiting
|
|
||||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
||||||
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||||
|
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
||||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
|
||||||
|
|
||||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||||
|
|
||||||
|
|
|
@ -1713,6 +1713,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
|
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
|
||||||
|
ASSERT(stbName[0] != 0);
|
||||||
SArray* tags = taosArrayInit(0, sizeof(void*));
|
SArray* tags = taosArrayInit(0, sizeof(void*));
|
||||||
SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
|
SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
|
||||||
pTag->key = "group_id";
|
pTag->key = "group_id";
|
||||||
|
|
|
@ -105,7 +105,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
|
||||||
int32_t size = encoder.pos;
|
int32_t size = encoder.pos;
|
||||||
int32_t tlen = sizeof(SMsgHead) + size;
|
int32_t tlen = sizeof(SMsgHead) + size;
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
void* buf = taosMemoryCalloc(1, tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -157,6 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
}
|
}
|
||||||
sdbRelease(pMnode->pSdb, pDb);
|
sdbRelease(pMnode->pSdb, pDb);
|
||||||
|
|
||||||
|
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t sz = taosArrayGetSize(pVgs);
|
int32_t sz = taosArrayGetSize(pVgs);
|
||||||
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
|
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
|
||||||
|
@ -166,6 +167,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
for (int32_t j = 0; j < sinkLvSize; j++) {
|
for (int32_t j = 0; j < sinkLvSize; j++) {
|
||||||
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
||||||
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
||||||
|
ASSERT(pVgInfo->vgId > 0);
|
||||||
pVgInfo->taskId = pLastLevelTask->taskId;
|
pVgInfo->taskId = pLastLevelTask->taskId;
|
||||||
ASSERT(pVgInfo->taskId != 0);
|
ASSERT(pVgInfo->taskId != 0);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -134,6 +134,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
||||||
int32_t sz = taosArrayGetSize(vgInfo);
|
int32_t sz = taosArrayGetSize(vgInfo);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
ASSERT(pVgInfo->vgId > 0);
|
||||||
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||||
vgId = pVgInfo->vgId;
|
vgId = pVgInfo->vgId;
|
||||||
downstreamTaskId = pVgInfo->taskId;
|
downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
|
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1;
|
||||||
|
|
||||||
|
@ -119,8 +119,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
|
|
||||||
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue