clone task id
This commit is contained in:
parent
567aab933d
commit
94ab74cc7f
|
@ -37,7 +37,7 @@ typedef struct SSourceDataInfo {
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
EX_SOURCE_STATUS status;
|
EX_SOURCE_STATUS status;
|
||||||
const char* taskId;
|
char* taskId;
|
||||||
SArray* pSrcUidList;
|
SArray* pSrcUidList;
|
||||||
int32_t srcOpType;
|
int32_t srcOpType;
|
||||||
bool tableSeq;
|
bool tableSeq;
|
||||||
|
@ -260,14 +260,16 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t len = strlen(id);
|
||||||
for (int32_t i = 0; i < numOfSources; ++i) {
|
for (int32_t i = 0; i < numOfSources; ++i) {
|
||||||
SSourceDataInfo dataInfo = {0};
|
SSourceDataInfo dataInfo = {0};
|
||||||
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
|
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
|
||||||
dataInfo.taskId = id;
|
dataInfo.taskId = taosMemoryCalloc(1, len);
|
||||||
|
memcpy(dataInfo.taskId, id, len);
|
||||||
dataInfo.index = i;
|
dataInfo.index = i;
|
||||||
SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
|
SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
|
||||||
if (pDs == NULL) {
|
if (pDs == NULL) {
|
||||||
taosArrayDestroy(pInfo->pSourceDataInfo);
|
taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,6 +370,7 @@ void freeBlock(void* pParam) {
|
||||||
void freeSourceDataInfo(void* p) {
|
void freeSourceDataInfo(void* p) {
|
||||||
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
|
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
|
||||||
taosMemoryFreeClear(pInfo->pRsp);
|
taosMemoryFreeClear(pInfo->pRsp);
|
||||||
|
taosMemoryFreeClear(pInfo->taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doDestroyExchangeOperatorInfo(void* param) {
|
void doDestroyExchangeOperatorInfo(void* param) {
|
||||||
|
@ -782,7 +785,10 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
|
||||||
if (pIdx->inUseIdx < 0) {
|
if (pIdx->inUseIdx < 0) {
|
||||||
SSourceDataInfo dataInfo = {0};
|
SSourceDataInfo dataInfo = {0};
|
||||||
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
|
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
|
||||||
dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo);
|
char* pTaskId = GET_TASKID(pOperator->pTaskInfo);
|
||||||
|
int32_t len = strlen(pTaskId);
|
||||||
|
dataInfo.taskId = taosMemoryCalloc(1, len);
|
||||||
|
memcpy(dataInfo.taskId, pTaskId, len);
|
||||||
dataInfo.index = pIdx->srcIdx;
|
dataInfo.index = pIdx->srcIdx;
|
||||||
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
|
||||||
dataInfo.srcOpType = pBasicParam->srcOpType;
|
dataInfo.srcOpType = pBasicParam->srcOpType;
|
||||||
|
|
Loading…
Reference in New Issue