Merge pull request #10020 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
36cbe6ad34
|
@ -75,7 +75,6 @@ typedef struct STsdbQueryCond {
|
|||
} STsdbQueryCond;
|
||||
|
||||
typedef struct {
|
||||
void *pTable;
|
||||
TSKEY lastKey;
|
||||
uint64_t uid;
|
||||
} STableKeyInfo;
|
||||
|
@ -141,7 +140,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
|
|||
*/
|
||||
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId);
|
||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
|
||||
/**
|
||||
* get num of rows in mem table
|
||||
*
|
||||
|
|
|
@ -81,7 +81,6 @@ enum {
|
|||
CHECKINFO_CHOSEN_BOTH = 2 //for update=2(merge case)
|
||||
};
|
||||
|
||||
|
||||
typedef struct STableCheckInfo {
|
||||
uint64_t tableId;
|
||||
TSKEY lastKey;
|
||||
|
@ -209,31 +208,6 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
|
|||
return pLocalIdList;
|
||||
}
|
||||
|
||||
static void tsdbMayTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle, SArray* psTable) {
|
||||
// assert(pTsdbReadHandle != NULL && pTsdbReadHandle->pMemRef != NULL);
|
||||
//
|
||||
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
|
||||
// if (pTsdbReadHandle->pMemRef->ref++ == 0) {
|
||||
// tsdbTakeMemSnapshot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot), psTable);
|
||||
// }
|
||||
//
|
||||
// taosArrayDestroy(psTable);
|
||||
}
|
||||
|
||||
static void tsdbMayUnTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle) {
|
||||
// assert(pTsdbReadHandle != NULL);
|
||||
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
|
||||
// if (pMemRef == NULL) { // it has been freed
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if (--pMemRef->ref == 0) {
|
||||
// tsdbUnTakeMemSnapShot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot));
|
||||
// }
|
||||
//
|
||||
// pTsdbReadHandle->pMemRef = NULL;
|
||||
}
|
||||
|
||||
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle) {
|
||||
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
|
||||
//
|
||||
|
@ -263,9 +237,9 @@ static void tsdbMayUnTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle) {
|
|||
// return rows;
|
||||
//}
|
||||
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList, SArray** psTable) {
|
||||
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1);
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
|
||||
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||
assert(numOfGroup >= 1);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
|
||||
|
@ -273,14 +247,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SArray* pTable = taosArrayInit(4, sizeof(STable*));
|
||||
if (pTable == NULL) {
|
||||
taosArrayDestroy(pTableCheckInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||
SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);
|
||||
|
||||
size_t gsize = taosArrayGetSize(group);
|
||||
|
@ -289,12 +257,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
for (int32_t j = 0; j < gsize; ++j) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
|
||||
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
info.tableId = pKeyInfo->uid;
|
||||
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
|
||||
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
||||
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
|
||||
info.lastKey = pTsdbReadHandle->window.skey;
|
||||
|
@ -310,13 +273,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
}
|
||||
}
|
||||
|
||||
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
||||
// for (int32_t i = 0; i < gsize; ++i) {
|
||||
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
||||
// }
|
||||
|
||||
*psTable = pTable;
|
||||
// TODO group table according to the tag value.
|
||||
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
return pTableCheckInfo;
|
||||
}
|
||||
|
||||
|
@ -484,18 +442,17 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup
|
|||
return (tsdbReadHandleT*) pTsdbReadHandle;
|
||||
}
|
||||
|
||||
SArray* psTable = NULL;
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, &psTable);
|
||||
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
|
||||
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
|
||||
// tsdbCleanupQueryHandle(pTsdbReadHandle);
|
||||
taosArrayDestroy(psTable);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->idStr);
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo),
|
||||
taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr);
|
||||
|
||||
return (tsdbReadHandleT) pTsdbReadHandle;
|
||||
}
|
||||
|
||||
|
@ -2640,7 +2597,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
|||
break;
|
||||
}
|
||||
|
||||
STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id};
|
||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
|
||||
taosArrayPush(list, &info);
|
||||
}
|
||||
|
||||
|
@ -3231,7 +3188,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
|
|||
if (key < lastKey) {
|
||||
key = lastKey;
|
||||
|
||||
keyInfo.pTable = pInfo->pTable;
|
||||
// keyInfo.pTable = pInfo->pTable;
|
||||
keyInfo.lastKey = key;
|
||||
pInfo->lastKey = key;
|
||||
|
||||
|
@ -3245,29 +3202,19 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
|
|||
}
|
||||
}
|
||||
|
||||
// clear current group, unref unused table
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
|
||||
|
||||
// keyInfo.pTable may be NULL here.
|
||||
if (pInfo->pTable != keyInfo.pTable) {
|
||||
// tsdbUnRefTable(pInfo->pTable);
|
||||
}
|
||||
}
|
||||
|
||||
// more than one table in each group, only one table left for each group
|
||||
if (keyInfo.pTable != NULL) {
|
||||
totalNumOfTable++;
|
||||
if (taosArrayGetSize(pGroup) == 1) {
|
||||
// do nothing
|
||||
} else {
|
||||
taosArrayClear(pGroup);
|
||||
taosArrayPush(pGroup, &keyInfo);
|
||||
}
|
||||
} else { // mark all the empty groups, and remove it later
|
||||
taosArrayDestroy(pGroup);
|
||||
taosArrayPush(emptyGroup, &j);
|
||||
}
|
||||
// if (keyInfo.pTable != NULL) {
|
||||
// totalNumOfTable++;
|
||||
// if (taosArrayGetSize(pGroup) == 1) {
|
||||
// // do nothing
|
||||
// } else {
|
||||
// taosArrayClear(pGroup);
|
||||
// taosArrayPush(pGroup, &keyInfo);
|
||||
// }
|
||||
// } else { // mark all the empty groups, and remove it later
|
||||
// taosArrayDestroy(pGroup);
|
||||
// taosArrayPush(emptyGroup, &j);
|
||||
// }
|
||||
}
|
||||
|
||||
// window does not being updated, so set the original
|
||||
|
@ -3457,11 +3404,13 @@ void filterPrepare(void* expr, void* param) {
|
|||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||
#if 0
|
||||
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
||||
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
|
||||
STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
|
||||
STable* pTable1 = ((STableKeyInfo*) p1)->uid;
|
||||
STable* pTable2 = ((STableKeyInfo*) p2)->uid;
|
||||
|
||||
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
|
||||
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
|
||||
|
@ -3509,10 +3458,9 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
||||
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
|
||||
|
@ -3528,10 +3476,9 @@ static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
|||
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
|
||||
STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
|
||||
STable* pTable = taosArrayGetP(pTableList, 0);
|
||||
|
||||
SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));
|
||||
|
||||
STableKeyInfo info = {.pTable = pTable, .lastKey = skey};
|
||||
STableKeyInfo info = {.lastKey = skey};
|
||||
taosArrayPush(g, &info);
|
||||
|
||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||
|
@ -3542,13 +3489,13 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
|
|||
assert(ret == 0 || ret == -1);
|
||||
|
||||
if (ret == 0) {
|
||||
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
|
||||
STableKeyInfo info1 = {.lastKey = skey};
|
||||
taosArrayPush(g, &info1);
|
||||
} else {
|
||||
taosArrayPush(pGroups, &g); // current group is ended, start a new group
|
||||
g = taosArrayInit(16, sizeof(STableKeyInfo));
|
||||
|
||||
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
|
||||
STableKeyInfo info1 = {.lastKey = skey};
|
||||
taosArrayPush(g, &info1);
|
||||
}
|
||||
}
|
||||
|
@ -3581,8 +3528,8 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
|
|||
sup.pTagSchema = pTagSchema->pSchema;
|
||||
sup.pCols = pCols;
|
||||
|
||||
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
|
||||
// createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
|
||||
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
|
||||
createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
|
||||
}
|
||||
|
||||
return pTableGroup;
|
||||
|
@ -3690,16 +3637,16 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
|
|||
|
||||
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId) {
|
||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
||||
STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
|
||||
if (pTbCfg == NULL) {
|
||||
tsdbError("%p failed to get stable, uid:%"PRIu64", reqId:0x%"PRIx64, tsdb, uid, reqId);
|
||||
tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pTbCfg->type != META_SUPER_TABLE) {
|
||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", reId:0x%"PRIx64, tsdb, uid, reqId);
|
||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
|
||||
goto _error;
|
||||
}
|
||||
|
@ -3718,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
|
|||
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
|
||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
||||
|
||||
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb,
|
||||
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
|
||||
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb,
|
||||
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
||||
|
||||
taosArrayDestroy(res);
|
||||
return ret;
|
||||
|
@ -3892,7 +3839,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
|
|||
tfree(pTsdbReadHandle->statis);
|
||||
|
||||
if (!emptyQueryTimewindow(pTsdbReadHandle)) {
|
||||
tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
|
||||
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
|
||||
} else {
|
||||
assert(pTsdbReadHandle->pTableCheckInfo == NULL);
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@
|
|||
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
|
||||
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
|
||||
|
||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.idstr)
|
||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
|
||||
|
||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||
|
||||
|
|
|
@ -239,8 +239,7 @@ typedef struct STaskIdInfo {
|
|||
uint64_t queryId; // this is also a request id
|
||||
uint64_t subplanId;
|
||||
uint64_t templateId;
|
||||
uint64_t taskId; // this is a subplan id
|
||||
char *idstr;
|
||||
char *str;
|
||||
} STaskIdInfo;
|
||||
|
||||
typedef struct SExecTaskInfo {
|
||||
|
@ -639,7 +638,6 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
|
|||
bool isTaskKilled(SExecTaskInfo *pTaskInfo);
|
||||
int32_t checkForQueryBuf(size_t numOfTables);
|
||||
bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
|
||||
bool doBuildResCheck(SQInfo* pQInfo);
|
||||
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
|
||||
|
||||
bool onlyQueryTags(STaskAttr* pQueryAttr);
|
||||
|
|
|
@ -18,20 +18,20 @@
|
|||
#include "executorimpl.h"
|
||||
#include "planner.h"
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) {
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
|
||||
ASSERT(pOperator != NULL);
|
||||
if (pOperator->operatorType != OP_StreamScan) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId);
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||
qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId);
|
||||
qError("join not supported for stream block scan, %s" PRIx64, id);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
return doSetStreamBlock(pOperator->pDownstream[0], input, reqId);
|
||||
return doSetStreamBlock(pOperator->pDownstream[0], input, id);
|
||||
} else {
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
||||
|
@ -50,7 +50,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||
|
||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, pTaskInfo->id.queryId);
|
||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||
} else {
|
||||
|
|
|
@ -5146,7 +5146,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5208,7 +5208,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
|
||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
} else {
|
||||
|
@ -7741,11 +7741,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
|
|||
|
||||
pTaskInfo->cost.created = taosGetTimestampMs();
|
||||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->id.taskId = taskId;
|
||||
|
||||
char* p = calloc(1, 128);
|
||||
snprintf(p, 128, "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, queryId);
|
||||
pTaskInfo->id.idstr = strdup(p);
|
||||
pTaskInfo->id.str = strdup(p);
|
||||
|
||||
return pTaskInfo;
|
||||
}
|
||||
|
@ -7822,7 +7821,7 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
|
|||
|
||||
if (tableType == TSDB_SUPER_TABLE) {
|
||||
code =
|
||||
tsdbQuerySTableByTagCond(readerHandle, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, queryId);
|
||||
tsdbQuerySTableByTagCond(readerHandle, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, queryId, taskId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -7832,7 +7831,7 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
|
|||
|
||||
SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
|
||||
STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = uid};
|
||||
taosArrayPush(pa, &info);
|
||||
taosArrayPush(groupInfo.pGroupList, &pa);
|
||||
}
|
||||
|
@ -8827,35 +8826,17 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
|
|||
}
|
||||
|
||||
void doDestroyTask(SExecTaskInfo *pTaskInfo) {
|
||||
qDebug("%s start to free execTask", GET_TASKID(pTaskInfo));
|
||||
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
|
||||
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
|
||||
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
|
||||
|
||||
tfree(pTaskInfo->sql);
|
||||
tfree(pTaskInfo->id.str);
|
||||
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
||||
|
||||
tfree(pTaskInfo);
|
||||
}
|
||||
|
||||
bool doBuildResCheck(SQInfo* pQInfo) {
|
||||
bool buildRes = false;
|
||||
|
||||
pthread_mutex_lock(&pQInfo->lock);
|
||||
|
||||
pQInfo->dataReady = QUERY_RESULT_READY;
|
||||
buildRes = needBuildResAfterQueryComplete(pQInfo);
|
||||
|
||||
// clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is
|
||||
// put into task to be executed.
|
||||
assert(pQInfo->owner == taosGetSelfPthreadId());
|
||||
pQInfo->owner = 0;
|
||||
|
||||
pthread_mutex_unlock(&pQInfo->lock);
|
||||
|
||||
// used in retrieve blocking model.
|
||||
tsem_post(&pQInfo->ready);
|
||||
return buildRes;
|
||||
}
|
||||
|
||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
|
||||
if (val == NULL) {
|
||||
setNull(output, type, bytes);
|
||||
|
|
Loading…
Reference in New Issue