use taskid of operator

This commit is contained in:
54liuyao 2024-01-18 09:22:39 +08:00
parent 1309e56a95
commit 7986951f44
2 changed files with 8 additions and 8 deletions

View File

@ -200,6 +200,7 @@ typedef struct SExchangeInfo {
uint64_t self;
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
char* pTaskId;
} SExchangeInfo;
typedef struct SScanInfo {

View File

@ -37,7 +37,7 @@ typedef struct SSourceDataInfo {
int64_t startTime;
int32_t code;
EX_SOURCE_STATUS status;
char* taskId;
const char* taskId;
SArray* pSrcUidList;
int32_t srcOpType;
bool tableSeq;
@ -261,11 +261,12 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
}
int32_t len = strlen(id) + 1;
pInfo->pTaskId = taosMemoryCalloc(1, len);
strncpy(pInfo->pTaskId, id, len);
for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = taosMemoryCalloc(1, len);
strncpy(dataInfo.taskId, id, len);
dataInfo.taskId = pInfo->pTaskId;
dataInfo.index = i;
SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) {
@ -370,7 +371,6 @@ void freeBlock(void* pParam) {
void freeSourceDataInfo(void* p) {
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
taosMemoryFreeClear(pInfo->pRsp);
taosMemoryFreeClear(pInfo->taskId);
}
void doDestroyExchangeOperatorInfo(void* param) {
@ -386,6 +386,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
tSimpleHashCleanup(pExInfo->pHashSources);
tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(pExInfo->pTaskId);
taosMemoryFreeClear(param);
}
@ -785,10 +787,7 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
if (pIdx->inUseIdx < 0) {
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
char* pTaskId = GET_TASKID(pOperator->pTaskInfo);
int32_t len = strlen(pTaskId) + 1;
dataInfo.taskId = taosMemoryCalloc(1, len);
strncpy(dataInfo.taskId, pTaskId, len);
dataInfo.taskId = pExchangeInfo->pTaskId;
dataInfo.index = pIdx->srcIdx;
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
dataInfo.srcOpType = pBasicParam->srcOpType;