Merge remote-tracking branch 'origin/feature/3_liaohj' into feature/3_liaohj
This commit is contained in:
commit
f88d80844e
|
@ -3719,8 +3719,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
|
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%"PRId64", query range:%" PRId64 " - %" PRId64 " in query %s",
|
||||||
pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -533,6 +533,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
|
||||||
taosArrayPush(rsp.pArray, &cRsp);
|
taosArrayPush(rsp.pArray, &cRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
||||||
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
||||||
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -885,8 +886,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
if (NULL != submitBlkRsp.pMeta) {
|
if (NULL != submitBlkRsp.pMeta) {
|
||||||
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
|
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayPush(newTbUids, &createTbReq.uid);
|
||||||
}
|
}
|
||||||
taosArrayPush(newTbUids, &createTbReq.uid);
|
|
||||||
|
|
||||||
submitBlkRsp.uid = createTbReq.uid;
|
submitBlkRsp.uid = createTbReq.uid;
|
||||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||||
|
@ -917,6 +919,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
||||||
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(newTbUids) > 0) {
|
||||||
|
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
|
||||||
|
}
|
||||||
|
|
||||||
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
|
@ -265,6 +265,15 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
|
|
||||||
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
|
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
|
||||||
|
|
||||||
|
if (isAdd) {
|
||||||
|
qDebug("add %d tables id into query list, %s", (int32_t) taosArrayGetSize(tableIdList), pTaskInfo->id.str);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pListInfo->map == NULL) {
|
||||||
|
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
}
|
||||||
|
|
||||||
// traverse to the stream scanner node to add this table id
|
// traverse to the stream scanner node to add this table id
|
||||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||||
|
@ -311,13 +320,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
bool exists = false;
|
||||||
if (pTaskInfo->tableqinfoList.map == NULL) {
|
for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
|
||||||
pTaskInfo->tableqinfoList.map =
|
STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
|
||||||
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
if (pKeyInfo->uid == keyInfo.uid) {
|
||||||
|
qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
|
||||||
|
exists = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
if (!exists) {
|
||||||
|
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||||
|
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keyBuf != NULL) {
|
if (keyBuf != NULL) {
|
||||||
|
|
|
@ -617,19 +617,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// if scan table by table
|
// if scan table by table
|
||||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||||
if (pInfo->noTable) return NULL;
|
if (pInfo->noTable) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
if (result) {
|
if (result) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no data, switch to next table and continue scan
|
// if no data, switch to next table and continue scan
|
||||||
pInfo->currentTable++;
|
pInfo->currentTable++;
|
||||||
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
|
if (pInfo->currentTable >= numOfTables) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||||
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||||
|
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue