多租户适配
需要从产品底层进行尽量少的改造,能够满足上云之后多租户的数据、缓存、定时任务等隔离
| 条目名称 |
适配方案 |
| 持久层适配 |
支持schema和字段隔离两种方案 |
| quartz定时任务 |
上下文无法获取租户信息,通过JobGroup识别 |
| reids缓存 |
缓存key体现租户id即可 |
| websocket场景 |
从cookie获取、前端调用diwork的api获取租户信息塞到cookie,后端websocket握手后从cookie获取 |
持久层适配
考虑到产品业务的实际情况,要求数据源同时支持schema隔离和字段隔离,持久层的多租户适配业务代码需要零感知、无侵入,适配实现过程如下:
- 表结构改造,追加租户字段、有预置脚本的表,需要跟租户字段建立联合主键;
- 引入动态数据源,动态数据源查询租户信息,切换schema实现租户按schema隔离;
- 改造dao,采用cglib加入Interceptor,在dao层方法的执> 行前加入拦截;
- 用jsqlParser编写sql解析类,第3步拦截到的sql追加租户ID的条件;
动态数据源关键代码
获取租户信息中的schema信息,根据schema信息切换,租户信息通过rest接口获取,考虑了到性能已加ThreadLocal和redis两重缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| protected Connection changeCatalog(Connection con) throws SQLException { String tenantId = InvocationInfoProxy.getTenantid(); if (StringUtils.isBlank(tenantId)) { tenantId = "tenant"; } String catalog = this.getCatalog(tenantId); if (StringUtils.isNotBlank(catalog)) { try { con.setCatalog(catalog); } catch (SQLException e) { logger.error("Error occurred when setting catalog for connection, Tenant ID is {}", tenantId); con.close(); throw e; } } else {
String defaultCatalog = PropertyUtil.getPropertyByKey("jdbc.catalog"); if (StringUtils.isNotBlank(defaultCatalog) && !defaultCatalog.equals(con.getCatalog())) { con.setCatalog(defaultCatalog); logger.info("reset catalog for connection success!"); } } return con; }
|
dao层改造关键代码
通过cglib代理的方式改造dao层,业务代码对租户隔离零感知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| protected MdmJdbcPersistenceManager createPersistenceManager() throws DbException { if (this.manager == null) { try { this.lock.lock(); if (this.manager == null) { MdmJdbcSession jdbcSession = ProxyFactory.getProxy( MdmJdbcSession.class, new Class[]{JdbcTemplate.class, DBMetaHelper.class}, new Object[] {jdbcTemplate, dbMetaHelper}, new MdmJdbcPersistenceFilter(), NoOp.INSTANCE, new ExecuteInterceptor(jdbcTemplate, dbMetaHelper)); manager = new MdmJdbcPersistenceManager(jdbcTemplate, dbMetaHelper, jdbcSession); } } finally { this.lock.unlock(); } } return (MdmJdbcPersistenceManager) this.manager; }
|
Interceptor 关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { List<String> sqlList = new ArrayList<>(); try { if (objects[SQL_INDEX] instanceof String) { sqlList = Collections.singletonList(String.valueOf(objects[SQL_INDEX])); } else if (objects[SQL_INDEX] instanceof List) { sqlList = (List<String>) objects[SQL_INDEX]; } } catch (Exception e) { logger.error("Errors occurred when extract sql from jdbc session, details:" + e.getMessage(), e); } if (CollectionUtils.isNotEmpty(sqlList)) { List<String> processedSqlList = MdmSQLParser.process(sqlList); if (CollectionUtils.isNotEmpty(processedSqlList)) { if (objects[SQL_INDEX] instanceof String) { objects[SQL_INDEX] = processedSqlList.get(0); } else if (objects[SQL_INDEX] instanceof List) { objects[SQL_INDEX] = processedSqlList; } } } return methodProxy.invokeSuper(o, objects);
}
|
sqlParser关键代码
采用jSqlParser解析sql语句,并拼接租户id的条件,sql语法解析会消耗部分性能,为了提高性能加入了缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public static String parseAndProcess(String oldSql) throws Exception { String cacheSql = getCache(oldSql); if(!CommonUtils.isNULL(cacheSql)) { return cacheSql; } Statement stmt = CCJSqlParserUtil.parse(oldSql); if (stmt instanceof Select) { Select select = (Select) stmt; logger.debug("select-sql处理前:" + select); checkAndHandleSelectBody(select.getSelectBody()); logger.debug("select-sql处理后:" + select);
} else if (stmt instanceof Insert){ Insert insert = (Insert) stmt; logger.debug("insert-sql处理前:" + stmt); processInsert(insert); logger.debug("insert-sql处理后:" + stmt); } else if (stmt instanceof Update) { Update update = (Update) stmt; logger.debug("update-sql处理前:" + stmt); processUpdate(update); logger.debug("update-sql处理后:" + stmt); } else if (stmt instanceof Delete) { Delete delete = (Delete) stmt; logger.debug("delete-sql处理前:" + stmt); processDelete(delete); logger.debug("delete-sql处理后:" + stmt); } putCache(oldSql, stmt.toString()); return stmt.toString(); }
|
定时任务适配
通过租户开通的回调函数,在其中通过消息驱动的方式,在主数据实例中通过消费方式,来给租户启动定时任务,租户的id即为定时任务的JobGroup,这样job在执行业务逻辑时,可以通过JobGroup获取租户信息,以下代码是通过redis发布订阅方式实现,也可以通过mq实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| final JedisPubSub jedisPubSub = new JedisPubSub() { @SuppressWarnings("unchecked") @Override public void onMessage(String channel, String message) { try { if (CHANNEL.equals(channel) && StringUtils.isNotBlank(message.trim())) { channelMessage[0] = message; InvocationInfoProxy.setTenantid(message); String statisticJobGroup = STATISTIC_ANALYSIS_JOB_GROUP; String statisticIdleJobName = "jobDetailStatisticAnalysisBgJob"; String statisticIdleJobNameByDayJobName = "jobDetailStatisticAnalysisByDayBgJob"; if (!QuartzManager.checkExists(statisticJobGroup, statisticIdleJobName)) { Class idleClazz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.StatisticAnalysisBgJob"); String idleCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "idleCronExp"); QuartzManager.addJob(statisticJobGroup, statisticIdleJobName, idleClazz, null, idleCronExp); } if (!QuartzManager.checkExists(statisticJobGroup, statisticIdleJobNameByDayJobName)) { Class byDayClazz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.StatisticAnalysisByDayBgJob"); String byDayCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "byDayCronExp"); QuartzManager.addJob(statisticJobGroup, statisticIdleJobNameByDayJobName, byDayClazz, null, byDayCronExp); } String tagExpireScanJobGroup = TAG_EXPIRE_SCAN_JOB_GROUP; String tagExpireScanJobName = "jobDetailTagExpireScanBgJob"; if (!QuartzManager.checkExists(tagExpireScanJobGroup, tagExpireScanJobName)) { Class expireScanClz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.TagExpireScanBgJob"); String expireScanCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "tagExpireScanCronExp"); QuartzManager.addJob(tagExpireScanJobGroup, tagExpireScanJobName, expireScanClz, null, expireScanCronExp); } } } catch (Exception e) { logger.error("Error occurred adding statistic analysis job, details:" + e.getMessage(), e); } } }; Thread daemon = new Thread(() -> { MdmCacheManager.getInstance().subscribe(jedisPubSub, CHANNEL); });
|
redis缓存适配
比较简单,构造key的时候体现tenantId即可
1 2 3 4 5 6 7
| private String buildKey(String key) { String tenantId = InvocationInfoProxy.getTenantid(); if (StringUtils.isBlank(tenantId)) { tenantId = "tenant"; } return StringUtils.join(new String[]{tenantId, key}, ":"); }
|
websocket场景
从WebSocketSession中获取cookie信息,设置上下文即可
文章作者:米兰
原始链接:https://blog.milanchen.site/posts/multi-tenant.html
版权声明:转载请声明出处