Merge pull request #985 from taosdata/feature/liaohj

Feature/liaohj
This commit is contained in:
slguan 2019-12-21 15:18:32 +08:00 committed by GitHub
commit 07b32ec20e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 110 deletions

View File

@ -66,19 +66,24 @@ typedef struct SJoinSubquerySupporter {
char path[PATH_MAX]; // temporary file path char path[PATH_MAX]; // temporary file path
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks* tscCreateDataBlock(size_t initialBufSize, int32_t rowSize, int32_t startOffset, const char* name); STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset); uint32_t offset);
SDataBlockList* tscCreateBlockArrayList(); SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList); void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId); int32_t startOffset, int32_t rowSize, const char* tableId,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);

View File

@ -653,9 +653,12 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
STableDataBlocks *dataBuf = STableDataBlocks *dataBuf = NULL;
tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name); sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize); int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize);
if (0 == maxNumOfRows) { if (0 == maxNumOfRows) {
@ -1072,8 +1075,12 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
strcpy(fname, full_path.we_wordv[0]); strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path); wordfree(&full_path);
STableDataBlocks *pDataBlock = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, STableDataBlocks *pDataBlock = NULL;
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock);
strcpy(pDataBlock->filename, fname); strcpy(pDataBlock->filename, fname);
@ -1314,8 +1321,12 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = pMeterMeta->rowSize; int32_t rowSize = pMeterMeta->rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, STableDataBlocks *pTableDataBlock = NULL;
sizeof(SShellSubmitBlock), pMeterMetaInfo->name); int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);

View File

@ -22,8 +22,8 @@
#include "tscJoinProcess.h" #include "tscJoinProcess.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tscSecondaryMerge.h" #include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tsqldef.h" #include "tsqldef.h"
#include "ttimer.h" #include "ttimer.h"
@ -72,8 +72,7 @@ void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* str, uint64_t uid) {
char* tmp = calloc(1, bufSize); char* tmp = calloc(1, bufSize);
int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name, int32_t keyLen = snprintf(tmp, bufSize, "%s,%s,%s,%d,%s,[%s],%d", pMeterMetaInfo->name,
(cond != NULL ? cond->cond : NULL), (cond != NULL ? cond->cond : NULL), (tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
(tbnameCondLen > 0 ? pTagCond->tbnameCond.cond : NULL),
pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType); pTagCond->relType, join, tagIdBuf, pCmd->groupbyExpr.orderType);
assert(keyLen <= bufSize); assert(keyLen <= bufSize);
@ -245,8 +244,8 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
// for project query, only the following two function is allowed // for project query, only the following two function is allowed
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
return false; return false;
} }
} }
@ -587,12 +586,17 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
* @param rowSize * @param rowSize
* @param startOffset * @param startOffset
* @param name * @param name
* @param pMeterMeta the ownership of pMeterMeta should be transfer to STableDataBlocks * @param dataBlocks
* @return * @return
*/ */
STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name) { int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno));
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
dataBuf->nAllocSize = (uint32_t)initialSize; dataBuf->nAllocSize = (uint32_t)initialSize;
dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true; dataBuf->ordered = true;
@ -604,29 +608,43 @@ STableDataBlocks* tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN); strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
// sure that the metermeta must be in the local client cache /*
* The metermeta may be released since the metermeta cache are completed clean by other thread
* due to operation such as drop database.
*/
dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId); dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId);
assert(dataBuf->pMeterMeta != NULL && initialSize > 0); assert(initialSize > 0);
return dataBuf; if (dataBuf->pMeterMeta == NULL) {
tfree(dataBuf);
return TSDB_CODE_QUERY_CACHE_ERASED;
} else {
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
} }
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId) { int32_t startOffset, int32_t rowSize, const char* tableId,
STableDataBlocks* dataBuf = NULL; STableDataBlocks** dataBlocks) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**) taosGetIntHashData(pHashList, id); STableDataBlocks** t1 = (STableDataBlocks**) taosGetIntHashData(pHashList, id);
if (t1 != NULL) { if (t1 != NULL) {
dataBuf = *t1; *dataBlocks = *t1;
} }
if (dataBuf == NULL) { if (*dataBlocks == NULL) {
dataBuf = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId); int32_t ret = tscCreateDataBlock((size_t) size, rowSize, startOffset, tableId, dataBlocks);
dataBuf = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)&dataBuf); if (ret != TSDB_CODE_SUCCESS) {
tscAppendDataBlock(pDataBlockList, dataBuf); return ret;
} }
return dataBuf; *dataBlocks = *(STableDataBlocks**)taosAddIntHash(pHashList, id, (char*)(*dataBlocks));
tscAppendDataBlock(pDataBlockList, *dataBlocks);
}
return TSDB_CODE_SUCCESS;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
@ -638,9 +656,15 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) { for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf = STableDataBlocks* dataBuf = NULL;
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid,
tsInsertHeadSize, 0, pOneTableBlock->meterId); TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to allocate the data buffer block for merging table data", pSql);
tscDestroyBlockArrayList(pTableDataBlockList);
return ret;
}
int64_t destSize = dataBuf->size + pOneTableBlock->size; int64_t destSize = dataBuf->size + pOneTableBlock->size;
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
@ -1780,7 +1804,9 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd* pCmd = &pObj->pSql->cmd; SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0; TSDB_SQL_USE_DB == pCmd->command)
? 1
: 0;
} }
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
@ -1816,4 +1842,3 @@ bool tscHasReachLimitation(SSqlObj* pSql) {
return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit); return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit);
} }

View File

@ -137,8 +137,9 @@ extern "C" {
#define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_INVALID_VNODE_STATUS 116
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
#define TSDB_CODE_TABLE_ID_MISMATCH 118 #define TSDB_CODE_TABLE_ID_MISMATCH 118
#define TSDB_CODE_QUERY_CACHE_ERASED 119
#define TSDB_CODE_MAX_ERROR_CODE 119 #define TSDB_CODE_MAX_ERROR_CODE 120
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -242,5 +242,6 @@ char *tsError[] = {"success",
"invalid table id", // 115 "invalid table id", // 115
"invalid vnode status", "invalid vnode status",
"failed to lock resources", "failed to lock resources",
"table id/uid mismatch", // 118 "table id/uid mismatch",
"client query cache erased", // 119
}; };

View File

@ -97,10 +97,11 @@ void tExtMemBufferDestroy(tExtMemBuffer **pMemBuffer) {
// close temp file // close temp file
if ((*pMemBuffer)->dataFile != 0) { if ((*pMemBuffer)->dataFile != 0) {
int32_t ret = fclose((*pMemBuffer)->dataFile); if (fclose((*pMemBuffer)->dataFile) != 0) {
if (ret != 0) {
pError("failed to close file:%s, reason:%s", (*pMemBuffer)->dataFilePath, strerror(errno)); pError("failed to close file:%s, reason:%s", (*pMemBuffer)->dataFilePath, strerror(errno));
} }
pTrace("remove temp file:%s for external buffer", (*pMemBuffer)->dataFilePath);
unlink((*pMemBuffer)->dataFilePath); unlink((*pMemBuffer)->dataFilePath);
} }