refactor: do some internal refactor.
This commit is contained in:
parent
72bbc27e8c
commit
6f4851a6a2
|
@ -108,6 +108,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||||
|
|
||||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
||||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo);
|
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo);
|
||||||
|
int32_t getGroupIdFromTableTags(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
|
||||||
|
size_t getTableTagsBufLen(const SNodeList* pGroups);
|
||||||
|
|
||||||
SArray* createSortInfo(SNodeList* pNodeList);
|
SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||||
|
|
|
@ -423,6 +423,7 @@ typedef struct SStreamScanInfo {
|
||||||
// status for tmq
|
// status for tmq
|
||||||
// SSchemaWrapper schema;
|
// SSchemaWrapper schema;
|
||||||
STqOffset offset;
|
STqOffset offset;
|
||||||
|
SNodeList* pGroupTags;
|
||||||
SNode* pTagCond;
|
SNode* pTagCond;
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
} SStreamScanInfo;
|
} SStreamScanInfo;
|
||||||
|
|
|
@ -379,6 +379,82 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t getTableTagsBufLen(const SNodeList* pGroups) {
|
||||||
|
size_t keyLen = 0;
|
||||||
|
|
||||||
|
SNode* node;
|
||||||
|
FOREACH(node, pGroups) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)node;
|
||||||
|
keyLen += pExpr->resType.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
|
||||||
|
return keyLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getGroupIdFromTableTags(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) {
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
|
||||||
|
SNodeList* groupNew = nodesCloneList(pGroupNode);
|
||||||
|
|
||||||
|
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
|
||||||
|
char* isNull = (char*)keyBuf;
|
||||||
|
char* pStart = (char*)keyBuf + sizeof(int8_t)*LIST_LENGTH(pGroupNode);
|
||||||
|
|
||||||
|
SNode* pNode;
|
||||||
|
int32_t index = 0;
|
||||||
|
FOREACH(pNode, groupNew) {
|
||||||
|
SNode* pNew = NULL;
|
||||||
|
int32_t code = scalarCalculateConstants(pNode, &pNew);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
REPLACE_NODE(pNew);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(keyBuf);
|
||||||
|
nodesDestroyList(groupNew);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
||||||
|
SValueNode* pValue = (SValueNode*)pNew;
|
||||||
|
|
||||||
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
|
||||||
|
isNull[index++] = 1;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
isNull[index++] = 0;
|
||||||
|
char* data = nodesGetValueFromNode(pValue);
|
||||||
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
if (tTagIsJson(data)) {
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
|
taosMemoryFree(keyBuf);
|
||||||
|
nodesDestroyList(groupNew);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
int32_t len = getJsonValueLen(data);
|
||||||
|
memcpy(pStart, data, len);
|
||||||
|
pStart += len;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
|
||||||
|
memcpy(pStart, data, varDataTLen(data));
|
||||||
|
pStart += varDataTLen(data);
|
||||||
|
} else {
|
||||||
|
memcpy(pStart, data, pValue->node.resType.bytes);
|
||||||
|
pStart += pValue->node.resType.bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
|
*pGroupId = calcGroupId(keyBuf, len);
|
||||||
|
|
||||||
|
nodesDestroyList(groupNew);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||||
if (!pNodeList) {
|
if (!pNodeList) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -14,10 +14,21 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "tref.h"
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
|
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
||||||
|
int32_t exchangeObjRefPool = -1;
|
||||||
|
|
||||||
|
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
|
||||||
|
static void cleanupRefPool() {
|
||||||
|
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
|
||||||
|
taosCloseRef(ref);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
|
||||||
char* id) {
|
char* id) {
|
||||||
|
@ -184,8 +195,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList,
|
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
|
||||||
const char* idstr) {
|
|
||||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||||
|
|
||||||
// let's discard the tables those are not created according to the queried super table.
|
// let's discard the tables those are not created according to the queried super table.
|
||||||
|
@ -239,7 +249,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SStreamScanInfo* pScanInfo = pInfo->info;
|
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||||
if (isAdd) { // add new table id
|
if (isAdd) { // add new table id
|
||||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||||
|
@ -247,15 +257,35 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to qTaskInfo
|
|
||||||
// todo refactor STableList
|
// todo refactor STableList
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
size_t bufLen = (pScanInfo->pGroupTags != NULL)? getTableTagsBufLen(pScanInfo->pGroupTags):0;
|
||||||
uint64_t* uid = taosArrayGet(qa, i);
|
char* keyBuf = NULL;
|
||||||
|
if (bufLen > 0) {
|
||||||
|
keyBuf = taosMemoryMalloc(bufLen);
|
||||||
|
if (keyBuf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
||||||
|
uint64_t* uid = taosArrayGet(qa, i);
|
||||||
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||||
|
|
||||||
|
if (bufLen > 0) {
|
||||||
|
code = getGroupIdFromTableTags(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
|
||||||
|
&keyInfo.groupId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (keyBuf != NULL) {
|
||||||
|
taosMemoryFree(keyBuf);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(qa);
|
taosArrayDestroy(qa);
|
||||||
} else { // remove the table id in current list
|
} else { // remove the table id in current list
|
||||||
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
||||||
|
@ -289,3 +319,378 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||||
|
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
|
assert(pSubplan != NULL);
|
||||||
|
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
|
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||||
|
atexit(cleanupRefPool);
|
||||||
|
|
||||||
|
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
|
||||||
|
code = dsDataSinkMgtInit(&cfg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handle) {
|
||||||
|
void* pSinkParam = NULL;
|
||||||
|
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
_error:
|
||||||
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef TEST_IMPL
|
||||||
|
// wait moment
|
||||||
|
int waitMoment(SQInfo* pQInfo) {
|
||||||
|
if (pQInfo->sql) {
|
||||||
|
int ms = 0;
|
||||||
|
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
||||||
|
if (pcnt) return 0;
|
||||||
|
|
||||||
|
char* pos = strstr(pQInfo->sql, " t_");
|
||||||
|
if (pos) {
|
||||||
|
pos += 3;
|
||||||
|
ms = atoi(pos);
|
||||||
|
while (*pos >= '0' && *pos <= '9') {
|
||||||
|
pos++;
|
||||||
|
}
|
||||||
|
char unit_char = *pos;
|
||||||
|
if (unit_char == 'h') {
|
||||||
|
ms *= 3600 * 1000;
|
||||||
|
} else if (unit_char == 'm') {
|
||||||
|
ms *= 60 * 1000;
|
||||||
|
} else if (unit_char == 's') {
|
||||||
|
ms *= 1000;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ms == 0) return 0;
|
||||||
|
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
||||||
|
|
||||||
|
if (ms < 1000) {
|
||||||
|
taosMsleep(ms);
|
||||||
|
} else {
|
||||||
|
int used_ms = 0;
|
||||||
|
while (used_ms < ms) {
|
||||||
|
taosMsleep(1000);
|
||||||
|
used_ms += 1000;
|
||||||
|
if (isTaskKilled(pQInfo)) {
|
||||||
|
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
|
*pRes = NULL;
|
||||||
|
int64_t curOwner = 0;
|
||||||
|
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||||
|
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
||||||
|
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
|
return pTaskInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTaskInfo->cost.start == 0) {
|
||||||
|
pTaskInfo->cost.start = taosGetTimestampMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// error occurs, record the error code and return to client
|
||||||
|
int32_t ret = setjmp(pTaskInfo->env);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
pTaskInfo->code = ret;
|
||||||
|
cleanUpUdfs();
|
||||||
|
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||||
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
|
|
||||||
|
return pTaskInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
||||||
|
uint64_t el = (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
|
pTaskInfo->cost.elapsedTime += el;
|
||||||
|
if (NULL == *pRes) {
|
||||||
|
*useconds = pTaskInfo->cost.elapsedTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanUpUdfs();
|
||||||
|
|
||||||
|
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
||||||
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
|
|
||||||
|
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||||
|
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
|
||||||
|
|
||||||
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
|
return pTaskInfo->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qKillTask(qTaskInfo_t qinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
|
if (pTaskInfo == NULL) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
|
||||||
|
setTaskKilled(pTaskInfo);
|
||||||
|
|
||||||
|
// Wait for the query executing thread being stopped/
|
||||||
|
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
||||||
|
while (pTaskInfo->owner != 0) {
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
|
if (pTaskInfo == NULL) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||||
|
setTaskKilled(pTaskInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
||||||
|
if (pTaskInfo == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
||||||
|
|
||||||
|
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||||
|
doDestroyTask(pTaskInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
int32_t capacity = 0;
|
||||||
|
|
||||||
|
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
||||||
|
if (pTaskInfo->pRoot == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nOptrWithVal = 0;
|
||||||
|
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
||||||
|
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
|
||||||
|
taosMemoryFreeClear(*pOutput);
|
||||||
|
*len = 0;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
|
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
return decodeOperator(pTaskInfo->pRoot, pInput, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
uint8_t type = pOperator->operatorType;
|
||||||
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
*scanner = pOperator->info;
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
ASSERT(pOperator->numOfDownstream == 1);
|
||||||
|
pOperator = pOperator->pDownstream[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||||
|
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||||
|
pTaskInfo->streamInfo.recoverStartVer = startVer;
|
||||||
|
pTaskInfo->streamInfo.recoverEndVer = endVer;
|
||||||
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* qExtractReaderFromStreamScanner(void* scanner) {
|
||||||
|
SStreamScanInfo* pInfo = scanner;
|
||||||
|
return (void*)pInfo->tqReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
|
||||||
|
SStreamScanInfo* pInfo = scanner;
|
||||||
|
return pInfo->tqReader->pSchemaWrapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
|
||||||
|
SStreamScanInfo* pInfo = scanner;
|
||||||
|
return &pInfo->offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||||
|
return pTaskInfo->streamInfo.metaBlk;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||||
|
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||||
|
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
||||||
|
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
||||||
|
while (1) {
|
||||||
|
uint8_t type = pOperator->operatorType;
|
||||||
|
pOperator->status = OP_OPENED;
|
||||||
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
|
#if 0
|
||||||
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
||||||
|
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
||||||
|
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
|
||||||
|
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
|
||||||
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||||
|
int64_t uid = pOffset->uid;
|
||||||
|
int64_t ts = pOffset->ts;
|
||||||
|
|
||||||
|
if (uid == 0) {
|
||||||
|
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||||
|
uid = pTableInfo->uid;
|
||||||
|
ts = INT64_MIN;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
||||||
|
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
||||||
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
bool found = false;
|
||||||
|
for (int32_t i = 0; i < tableSz; i++) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||||
|
if (pTableInfo->uid == uid) {
|
||||||
|
found = true;
|
||||||
|
pTableScanInfo->currentTable = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO after dropping table, table may be not found
|
||||||
|
ASSERT(found);
|
||||||
|
|
||||||
|
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||||
|
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||||
|
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||||
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
pTableScanInfo->cond.twindows.skey = oldSkey;
|
||||||
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
|
pTableScanInfo->currentTable, tableSz);
|
||||||
|
/*}*/
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
ASSERT(pOperator->numOfDownstream == 1);
|
||||||
|
pOperator = pOperator->pDownstream[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
|
if (uid == 0) {
|
||||||
|
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||||
|
uid = pTableInfo->uid;
|
||||||
|
ts = INT64_MIN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
|
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -1,405 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "dataSinkMgt.h"
|
|
||||||
#include "os.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tref.h"
|
|
||||||
#include "tudf.h"
|
|
||||||
|
|
||||||
#include "executor.h"
|
|
||||||
#include "executorimpl.h"
|
|
||||||
#include "query.h"
|
|
||||||
|
|
||||||
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
|
||||||
int32_t exchangeObjRefPool = -1;
|
|
||||||
|
|
||||||
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
|
|
||||||
static void cleanupRefPool() {
|
|
||||||
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
|
|
||||||
taosCloseRef(ref);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
|
||||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
|
||||||
assert(pSubplan != NULL);
|
|
||||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
|
||||||
|
|
||||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
|
||||||
atexit(cleanupRefPool);
|
|
||||||
|
|
||||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
|
|
||||||
code = dsDataSinkMgtInit(&cfg);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (handle) {
|
|
||||||
void* pSinkParam = NULL;
|
|
||||||
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
|
||||||
}
|
|
||||||
|
|
||||||
_error:
|
|
||||||
// if failed to add ref for all tables in this query, abort current query
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef TEST_IMPL
|
|
||||||
// wait moment
|
|
||||||
int waitMoment(SQInfo* pQInfo) {
|
|
||||||
if (pQInfo->sql) {
|
|
||||||
int ms = 0;
|
|
||||||
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
|
||||||
if (pcnt) return 0;
|
|
||||||
|
|
||||||
char* pos = strstr(pQInfo->sql, " t_");
|
|
||||||
if (pos) {
|
|
||||||
pos += 3;
|
|
||||||
ms = atoi(pos);
|
|
||||||
while (*pos >= '0' && *pos <= '9') {
|
|
||||||
pos++;
|
|
||||||
}
|
|
||||||
char unit_char = *pos;
|
|
||||||
if (unit_char == 'h') {
|
|
||||||
ms *= 3600 * 1000;
|
|
||||||
} else if (unit_char == 'm') {
|
|
||||||
ms *= 60 * 1000;
|
|
||||||
} else if (unit_char == 's') {
|
|
||||||
ms *= 1000;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ms == 0) return 0;
|
|
||||||
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
|
||||||
|
|
||||||
if (ms < 1000) {
|
|
||||||
taosMsleep(ms);
|
|
||||||
} else {
|
|
||||||
int used_ms = 0;
|
|
||||||
while (used_ms < ms) {
|
|
||||||
taosMsleep(1000);
|
|
||||||
used_ms += 1000;
|
|
||||||
if (isTaskKilled(pQInfo)) {
|
|
||||||
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
|
||||||
|
|
||||||
*pRes = NULL;
|
|
||||||
int64_t curOwner = 0;
|
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
|
||||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
|
||||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
|
||||||
return pTaskInfo->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTaskInfo->cost.start == 0) {
|
|
||||||
pTaskInfo->cost.start = taosGetTimestampMs();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
|
||||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// error occurs, record the error code and return to client
|
|
||||||
int32_t ret = setjmp(pTaskInfo->env);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
pTaskInfo->code = ret;
|
|
||||||
cleanUpUdfs();
|
|
||||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
|
||||||
return pTaskInfo->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
|
|
||||||
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
|
||||||
uint64_t el = (taosGetTimestampUs() - st);
|
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += el;
|
|
||||||
if (NULL == *pRes) {
|
|
||||||
*useconds = pTaskInfo->cost.elapsedTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanUpUdfs();
|
|
||||||
|
|
||||||
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
|
||||||
|
|
||||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
|
||||||
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
|
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
|
||||||
return pTaskInfo->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qKillTask(qTaskInfo_t qinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
|
|
||||||
setTaskKilled(pTaskInfo);
|
|
||||||
|
|
||||||
// Wait for the query executing thread being stopped/
|
|
||||||
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
|
||||||
while (pTaskInfo->owner != 0) {
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
|
||||||
setTaskKilled(pTaskInfo);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
|
||||||
if (pTaskInfo == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
|
||||||
|
|
||||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
|
||||||
doDestroyTask(pTaskInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
int32_t capacity = 0;
|
|
||||||
|
|
||||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
|
||||||
if (pTaskInfo->pRoot == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nOptrWithVal = 0;
|
|
||||||
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
|
||||||
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
|
|
||||||
taosMemoryFreeClear(*pOutput);
|
|
||||||
*len = 0;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
|
||||||
|
|
||||||
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
|
|
||||||
return decodeOperator(pTaskInfo->pRoot, pInput, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
uint8_t type = pOperator->operatorType;
|
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
|
||||||
*scanner = pOperator->info;
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
ASSERT(pOperator->numOfDownstream == 1);
|
|
||||||
pOperator = pOperator->pDownstream[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
|
||||||
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
|
||||||
pTaskInfo->streamInfo.recoverStartVer = startVer;
|
|
||||||
pTaskInfo->streamInfo.recoverEndVer = endVer;
|
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* qExtractReaderFromStreamScanner(void* scanner) {
|
|
||||||
SStreamScanInfo* pInfo = scanner;
|
|
||||||
return (void*)pInfo->tqReader;
|
|
||||||
}
|
|
||||||
|
|
||||||
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
|
|
||||||
SStreamScanInfo* pInfo = scanner;
|
|
||||||
return pInfo->tqReader->pSchemaWrapper;
|
|
||||||
}
|
|
||||||
|
|
||||||
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
|
|
||||||
SStreamScanInfo* pInfo = scanner;
|
|
||||||
return &pInfo->offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
|
||||||
return pTaskInfo->streamInfo.metaBlk;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
|
||||||
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
|
||||||
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
|
||||||
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
|
||||||
while (1) {
|
|
||||||
uint8_t type = pOperator->operatorType;
|
|
||||||
pOperator->status = OP_OPENED;
|
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
|
||||||
#if 0
|
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
|
||||||
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
|
||||||
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
|
|
||||||
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
|
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
|
||||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
|
||||||
int64_t uid = pOffset->uid;
|
|
||||||
int64_t ts = pOffset->ts;
|
|
||||||
|
|
||||||
if (uid == 0) {
|
|
||||||
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
|
||||||
uid = pTableInfo->uid;
|
|
||||||
ts = INT64_MIN;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
|
||||||
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
|
||||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
|
||||||
bool found = false;
|
|
||||||
for (int32_t i = 0; i < tableSz; i++) {
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
|
||||||
if (pTableInfo->uid == uid) {
|
|
||||||
found = true;
|
|
||||||
pTableScanInfo->currentTable = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO after dropping table, table may be not found
|
|
||||||
ASSERT(found);
|
|
||||||
|
|
||||||
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
|
||||||
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
|
||||||
pTableScanInfo->cond.twindows.skey = ts + 1;
|
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
|
||||||
pTableScanInfo->cond.twindows.skey = oldSkey;
|
|
||||||
pTableScanInfo->scanTimes = 0;
|
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
|
||||||
pTableScanInfo->currentTable, tableSz);
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
ASSERT(pOperator->numOfDownstream == 1);
|
|
||||||
pOperator = pOperator->pDownstream[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
|
|
||||||
if (uid == 0) {
|
|
||||||
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
|
||||||
uid = pTableInfo->uid;
|
|
||||||
ts = INT64_MIN;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
|
|
||||||
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
|
|
||||||
}
|
|
||||||
#endif
|
|
|
@ -3187,7 +3187,8 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
|
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SSDataBlock** pExistedBlock,
|
||||||
|
bool holdDataInBuf) {
|
||||||
if (pLimitInfo->remainGroupOffset > 0) {
|
if (pLimitInfo->remainGroupOffset > 0) {
|
||||||
if (pLimitInfo->currentGroupId == 0) { // it is the first group
|
if (pLimitInfo->currentGroupId == 0) { // it is the first group
|
||||||
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
||||||
|
@ -3208,6 +3209,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
||||||
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
pLimitInfo->currentGroupId = pBlock->info.groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here check for a new group data, we need to handle the data of the previous group.
|
||||||
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
|
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
|
||||||
pLimitInfo->numOfOutputGroups += 1;
|
pLimitInfo->numOfOutputGroups += 1;
|
||||||
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
|
||||||
|
@ -3220,6 +3222,13 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
||||||
// reset the value for a new group data
|
// reset the value for a new group data
|
||||||
pLimitInfo->numOfOutputRows = 0;
|
pLimitInfo->numOfOutputRows = 0;
|
||||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||||
|
|
||||||
|
*pExistedBlock = pBlock;
|
||||||
|
|
||||||
|
// existing rows that belongs to previous group.
|
||||||
|
if (pBlock->info.rows > 0) {
|
||||||
|
return PROJECT_RETRIEVE_DONE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// here we reach the start position, according to the limit/offset requirements.
|
// here we reach the start position, according to the limit/offset requirements.
|
||||||
|
@ -3305,18 +3314,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||||
qDebug("projection call next");
|
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
qDebug("projection get null");
|
|
||||||
|
|
||||||
/*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
|
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
/*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
|
|
||||||
/*pOperator->status = OP_RES_TO_RETURN;*/
|
|
||||||
/*}*/
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_RETRIEVE) {
|
if (pBlock->info.type == STREAM_RETRIEVE) {
|
||||||
// for stream interval
|
// for stream interval
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
@ -4133,9 +4136,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
|
||||||
STableListInfo* pTableListInfo, const char* idstr);
|
|
||||||
|
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
||||||
|
@ -4292,69 +4292,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
int32_t groupNum = 0;
|
int32_t groupNum = 0;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
SMetaReader mr = {0};
|
int32_t code = getGroupIdFromTableTags(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
metaGetTableEntryByUid(&mr, info->uid);
|
return code;
|
||||||
|
|
||||||
SNodeList* groupNew = nodesCloneList(group);
|
|
||||||
|
|
||||||
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
|
|
||||||
char* isNull = (char*)keyBuf;
|
|
||||||
char* pStart = (char*)keyBuf + nullFlagSize;
|
|
||||||
|
|
||||||
SNode* pNode;
|
|
||||||
int32_t index = 0;
|
|
||||||
FOREACH(pNode, groupNew) {
|
|
||||||
SNode* pNew = NULL;
|
|
||||||
int32_t code = scalarCalculateConstants(pNode, &pNew);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
REPLACE_NODE(pNew);
|
|
||||||
} else {
|
|
||||||
taosMemoryFree(keyBuf);
|
|
||||||
nodesDestroyList(groupNew);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
|
||||||
SValueNode* pValue = (SValueNode*)pNew;
|
|
||||||
|
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
|
|
||||||
isNull[index++] = 1;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
isNull[index++] = 0;
|
|
||||||
char* data = nodesGetValueFromNode(pValue);
|
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
|
||||||
if (tTagIsJson(data)) {
|
|
||||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
|
||||||
taosMemoryFree(keyBuf);
|
|
||||||
nodesDestroyList(groupNew);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
int32_t len = getJsonValueLen(data);
|
|
||||||
memcpy(pStart, data, len);
|
|
||||||
pStart += len;
|
|
||||||
} else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
|
|
||||||
memcpy(pStart, data, varDataTLen(data));
|
|
||||||
pStart += varDataTLen(data);
|
|
||||||
} else {
|
|
||||||
memcpy(pStart, data, pValue->node.resType.bytes);
|
|
||||||
pStart += pValue->node.resType.bytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
||||||
uint64_t groupId = calcGroupId(keyBuf, len);
|
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
|
|
||||||
info->groupId = groupId;
|
|
||||||
groupNum++;
|
groupNum++;
|
||||||
|
|
||||||
nodesDestroyList(groupNew);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(keyBuf);
|
taosMemoryFree(keyBuf);
|
||||||
|
|
||||||
if (pTableListInfo->needSortTableByGroupId) {
|
if (pTableListInfo->needSortTableByGroupId) {
|
||||||
|
|
|
@ -1538,7 +1538,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
pInfo->pTagCond = pTagCond;
|
pInfo->pTagCond = pTagCond;
|
||||||
|
pInfo->pGroupTags = pTableScanNode->pGroupTags;
|
||||||
pInfo->twAggSup = *pTwSup;
|
pInfo->twAggSup = *pTwSup;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
|
|
Loading…
Reference in New Issue