[TD-225] refactor codes.
This commit is contained in:
parent
40016c5eb8
commit
e05a8fc80f
|
@ -110,11 +110,12 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
|
||||||
uint32_t offset);
|
uint32_t offset);
|
||||||
|
|
||||||
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
|
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
|
||||||
|
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable);
|
||||||
|
|
||||||
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
|
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql);
|
||||||
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
|
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
|
||||||
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
|
STableDataBlocks** dataBlocks, SArray* pBlockList);
|
||||||
STableDataBlocks** dataBlocks);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* for the projection query on metric or point interpolation query on metric,
|
* for the projection query on metric or point interpolation query on metric,
|
||||||
|
@ -275,6 +276,8 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
|
||||||
bool hasMoreVnodesToTry(SSqlObj *pSql);
|
bool hasMoreVnodesToTry(SSqlObj *pSql);
|
||||||
bool hasMoreClauseToTry(SSqlObj* pSql);
|
bool hasMoreClauseToTry(SSqlObj* pSql);
|
||||||
|
|
||||||
|
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache);
|
||||||
|
|
||||||
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
||||||
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
||||||
|
|
|
@ -251,7 +251,7 @@ typedef struct {
|
||||||
STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
|
STableMeta **pTableMetaList; // all involved tableMeta list of current insert sql statement.
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
|
|
||||||
SHashObj *pTableList; // data block for each table
|
SHashObj *pTableBlockHashList; // data block for each table
|
||||||
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
|
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
|
||||||
} SSqlCmd;
|
} SSqlCmd;
|
||||||
|
|
||||||
|
|
|
@ -692,7 +692,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||||
|
|
||||||
STableDataBlocks *dataBuf = NULL;
|
STableDataBlocks *dataBuf = NULL;
|
||||||
int32_t ret = tscGetDataBlockFromList(pCmd->pTableList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||||
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
|
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf, NULL);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -1055,9 +1055,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pCmd->pTableList) {
|
if (NULL == pCmd->pTableBlockHashList) {
|
||||||
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
if (NULL == pCmd->pTableList) {
|
if (NULL == pCmd->pTableBlockHashList) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
@ -1065,7 +1065,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
str = pCmd->curSql;
|
str = pCmd->curSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableList);
|
tscDebug("%p create data block list hashList:%p", pSql, pCmd->pTableBlockHashList);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
|
@ -1270,7 +1270,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashGetSize(pCmd->pTableList) > 0) { // merge according to vgId
|
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
|
||||||
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _clean;
|
goto _clean;
|
||||||
}
|
}
|
||||||
|
|
|
@ -800,7 +800,7 @@ static int insertStmtExecute(STscStmt* stmt) {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
assert(pCmd->numOfClause == 1);
|
assert(pCmd->numOfClause == 1);
|
||||||
|
|
||||||
if (taosHashGetSize(pCmd->pTableList) > 0) {
|
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) {
|
||||||
// merge according to vgid
|
// merge according to vgid
|
||||||
int code = tscMergeTableDataBlocks(stmt->pSql);
|
int code = tscMergeTableDataBlocks(stmt->pSql);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -900,9 +900,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
strtolower(pSql->sqlstr, sql);
|
strtolower(pSql->sqlstr, sql);
|
||||||
|
|
||||||
pCmd->curSql = NULL;
|
pCmd->curSql = NULL;
|
||||||
if (NULL != pCmd->pTableList) {
|
if (NULL != pCmd->pTableBlockHashList) {
|
||||||
taosHashCleanup(pCmd->pTableList);
|
taosHashCleanup(pCmd->pTableBlockHashList);
|
||||||
pCmd->pTableList = NULL;
|
pCmd->pTableBlockHashList = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->fp = asyncCallback;
|
pSql->fp = asyncCallback;
|
||||||
|
|
|
@ -409,7 +409,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
|
||||||
|
|
||||||
tfree(pCmd->pTableMetaList);
|
tfree(pCmd->pTableMetaList);
|
||||||
|
|
||||||
pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList);
|
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
tscFreeQueryInfo(pCmd, removeFromCache);
|
tscFreeQueryInfo(pCmd, removeFromCache);
|
||||||
}
|
}
|
||||||
|
@ -788,18 +788,18 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractTableMeta(SSqlCmd* pCmd) {
|
static void extractTableMeta(SSqlCmd* pCmd) {
|
||||||
pCmd->numOfTables = taosHashGetSize(pCmd->pTableList);
|
pCmd->numOfTables = taosHashGetSize(pCmd->pTableBlockHashList);
|
||||||
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
|
pCmd->pTableMetaList = calloc(pCmd->numOfTables, POINTER_BYTES);
|
||||||
|
|
||||||
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableList, NULL);
|
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while(p1) {
|
while(p1) {
|
||||||
STableDataBlocks* pBlocks = *p1;
|
STableDataBlocks* pBlocks = *p1;
|
||||||
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
|
pCmd->pTableMetaList[i++] = taosCacheTransfer(tscMetaCache, (void**) &pBlocks->pTableMeta);
|
||||||
p1 = taosHashIterate(pCmd->pTableList, p1);
|
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCmd->pTableList = tscDestroyBlockHashTable(pCmd->pTableList);
|
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
||||||
|
@ -808,7 +808,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
||||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
STableDataBlocks** p = taosHashIterate(pCmd->pTableList, NULL);
|
STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL);
|
||||||
|
|
||||||
STableDataBlocks* pOneTableBlock = *p;
|
STableDataBlocks* pOneTableBlock = *p;
|
||||||
while(pOneTableBlock) {
|
while(pOneTableBlock) {
|
||||||
|
@ -873,7 +873,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
||||||
pBlocks->dataLen = htonl(finalLen);
|
pBlocks->dataLen = htonl(finalLen);
|
||||||
dataBuf->numOfTables += 1;
|
dataBuf->numOfTables += 1;
|
||||||
|
|
||||||
p = taosHashIterate(pCmd->pTableList, p);
|
p = taosHashIterate(pCmd->pTableBlockHashList, p);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue