Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på Twitter

Callista medarbetare Björn Beskow

Dynamic Multi Tenancy with Spring Boot, Hibernate and Liquibase Part 7: Combining the Shared Database pattern with Database per Group of Tenants (a.k.a. Shard) for additional scalability

// Björn Beskow

In the last part, we implemented the Shared Database with Discriminator Column pattern using Row Level Security. The Shared Database with Discriminator pattern scales very well, but eventually the Shared Database will become a bottleneck. In this part, we’ll tweak the Database per Tenant pattern into Database per Group of Tenants (or Shard). Combined with the Shared Database with Discriminator pattern, we can reach yet another level of scalability.

Blog Series Parts

The limits of a Single Database Scalability

When using a single database for all tenants, the database itself will sooner or later fill up. In order to scale beyond that limit, we must be able to scale the database horizontally as well. Such horizontal scaling is commonly referred to as Sharding.


While many modern NoSQL-databases (e.g Cassandra, HBase, HDFS, and MongoDB among others) supports Sharding natively at the database level, this is usually not the case for traditional, relational databases. In those cases, the Sharding logic must be provided on the application level. In order to reduce application complexity, this horizontal partitioning of data should also be an isolated Cross Cutting Concern with little or no impact on the application code, just as with the Multi Tenancy data encapsulation.

Luckily, we already have the required mechanism: In a Multi Tenant solution, the Tenant Id is the natural Partition Key which determines which Partition (or Shard) a tenant’s data should be placed in. The sharding logic consists of selecting a database Shard based on a Tenant Id. Conceptually, this is very similar to the logic we already used to implement the Database per Tenant pattern. We should be able to tweak that implementation into a Database per Group of Tenants (i.e. Shard). By combining this strategy with the Shared Database with Discriminator pattern, we can get the best of two worlds: Horizontal partitioning of groups of Tenants into separate Shards, while using a Discriminating Column to separate between tenants within each Shard. The implementation can still be kept well encapsulated, with no impact on the rest of the application code.

Let’s get to it!

Tenant and Shard mapping meta data

First, we need to manage meta data about existing Tenants and their Shard relationship:

public class Tenant {

    @Size(max = 30)
    @Column(name = "tenant_id")
    private String tenantId;

    @ManyToOne(fetch = FetchType.LAZY)
    private Shard shard;

public class Shard {

    @Column(name = "id", nullable = false)
    private Integer id;

    @Size(max = 256)
    @Column(name = "url")
    private String url;

    @Size(max = 30)
    @Column(name = "db")
    private String db;


        mappedBy = "shard",
        cascade = CascadeType.ALL,
        orphanRemoval = true
    private Set<Tenant> tenants = new HashSet<>();
    public void addTenant(Tenant tenant) {

    public void removeTenant(Tenant tenant) {
        if (tenants.remove(tenant)) {
        } else {
            throw new IllegalStateException(MessageFormat.format("Tenant {0} not found in shard", tenant));
public interface TenantRepository extends CrudRepository<Tenant, String> {
    @Query("SELECT DISTINCT t FROM Tenant t JOIN FETCH t.shard WHERE t.tenantId = :tenantId")
    Optional<Tenant> findByTenantId(String tenantId);


Multi Tenant ConnectionProvider

We can now tweak the implementation of the MultiTenantConnectionProvider interface. We keep a lazy-loaded cache of Shard->DataSource mappings, as well as a cache of Tenant->Shard mappings. This allows us to select the correct Shard per tenant, and the corresponding DataSource for that Shard:

public class DynamicShardingMultiTenantConnectionProvider
                extends AbstractDataSourceBasedMultiTenantConnectionProviderImpl {

    private static final String TENANT_POOL_NAME_SUFFIX = "_DataSource";

    private DataSource masterDataSource;

    private DataSourceProperties dataSourceProperties;

    private TenantRepository masterTenantRepository;

    private LoadingCache<String, Tenant> tenants;
    private LoadingCache<Shard, DataSource> shardDataSources;

    private void createCaches() {
        tenants = Caffeine.newBuilder().build(
            tenantId -> masterTenantRepository.findByTenantId(tenantId).orElseThrow(
                            () -> new NoSuchTenantException("No such tenant: " + tenantId)));
        shardDataSources = CacheBuilder.newBuilder().build(
            shard -> createAndConfigureDataSource(shard));

    protected DataSource selectAnyDataSource() {
        return masterDataSource;

    protected DataSource selectDataSource(String tenantIdentifier) {
        try {
            Tenant tenant = tenants.get(tenantIdentifier);
            DataSource shardDataSource = shardDataSources.get(tenant.getShard());
            return new TenantAwareDataSource(shardDataSource);
        } catch (ExecutionException e) {
            throw new RuntimeException("Failed to load DataSource for tenant: " + tenantIdentifier);

    private DataSource createAndConfigureDataSource(Shard shard) {
        HikariDataSource ds = dataSourceProperties.initializeDataSourceBuilder()


        ds.setPoolName(shard.getDb() + TENANT_POOL_NAME_SUFFIX);"Configured datasource: {}", ds.getPoolName());
        return ds;


Once we have retrieved the correct Datasource for the Shard that the Tenant is connected to, the Row Level Security constraints takes care of separating the data between Tenants within that Shard (all other required details remain the same as in Part 3 and Part 6, omitted here for brevity).

Dynamically allocating new Tenants to Shards

Next step is to modify the mechanism to onboard new Tenants, so that a new Tenant is automatically allocated to a suitable Shard. The actual strategy for Shard allocation will most likely differ in different situations, depending on characteristics of both the Shard topology and the Tenants. Since making changes to the Shard allocation will be difficult, this is an important aspect to get right.

In this example, let’s keep things simple and assume that all Tenants have similar characteristics, and that we would like a Shard topology that starts small but can be dynamically increased when the number of Tenants grow. Let’s further assume that we can estimate the maximal capacity of a Database shard in terms of number of Tenants it can host.

The example shard allocation strategy would thus keep track of the current number of Tenants each Shard contains. New tenants could be allocated to any existing Shard with remaining capacity. If no such Shard exists, a new Shard would be automatically created.

Lets start with adding the capacity information to the Shards:

public class Shard {

    @Column(name = "no_of_tenants")
    private int numberOfTenants;



and a Repository that allows us to query for Shards with free capacity:

public interface ShardRepository extends CrudRepository<Shard, Integer> {

    @Query("SELECT s FROM Shard s WHERE s.numberOfTenants < :maxTenants")
    List<Shard> findShardsWithFreeCapacity(int maxTenants);


We can now implement the Shard allocation Strategy:

public class ShardManagementServiceImpl implements ShardManagementService {

    private final ShardRepository shardRepository;
    private final ShardInitializer shardInitializer;

    private String database;
    private String urlPrefix;
    private int maxTenants;

    private static final String DATABASE_NAME_INFIX = "_shard_";

    public void allocateToShard(Tenant tenant) {
        List<Shard> shardsWithFreeCapacity = shardRepository.findShardsWithFreeCapacity(maxTenants);
        if (!shardsWithFreeCapacity.isEmpty()) {
            Shard shard = shardsWithFreeCapacity.get(0);
        } else {
            int newShardIndex = ((int) shardRepository.count()) + 1;
            String newShardName = database + DATABASE_NAME_INFIX + newShardIndex;
            String newShardUrl = urlPrefix + newShardName;
            Shard shard = Shard.builder()


Just as before, we assume an administrative REST endpoint is used to create the new tenants:

public class TenantsApiController {

    private TenantManagementService tenantManagementService;

    public ResponseEntity<Void> createTenant(@RequestParam String tenantId) {
        return new ResponseEntity<>(HttpStatus.OK);
public class TenantManagementServiceImpl implements TenantManagementService {

    private final ShardManagementService shardManagementService;

    public void createTenant(String tenantId) {

        Tenant tenant = Tenant.builder()


Creating Shards

The mechanism for creating a new Shard will likely differ depending on situation. For the sake of simplicity, let’s assume we can automatically add new Shards by creating a new Database for the Shard using SQL, and then run the Liquibase migration to initialize the new Shard:

public class ShardInitializerImpl implements ShardInitializer {

    private final JdbcTemplate jdbcTemplate;
    private final LiquibaseProperties liquibaseProperties;
    private final ResourceLoader resourceLoader;

    private final String username;
    private final String password;

    public ShardInitializerImpl(
                    JdbcTemplate jdbcTemplate,
                    @Qualifier("shardLiquibaseProperties") LiquibaseProperties liquibaseProperties,
                    ResourceLoader resourceLoader,
                    @Value("${multitenancy.master.datasource.username}") String username,
                    @Value("${multitenancy.master.datasource.password}") String password) {
        this.jdbcTemplate = jdbcTemplate;
        this.liquibaseProperties = liquibaseProperties;
        this.resourceLoader = resourceLoader;
        this.username = username;
        this.password = password;

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void initializeShard(Shard shard) {
        try {
        } catch (DataAccessException e) {
            throw new ShardCreationException("Error when creating db: " + shard.getDb(), e);
        try (Connection connection =
                        DriverManager.getConnection(shard.getUrl(), username, password)) {
            DataSource shardDataSource = new SingleConnectionDataSource(connection, false);
        } catch (SQLException | LiquibaseException e) {
            throw new ShardCreationException("Error when populating db: ", e);

    private void createDatabase(String db) {
                        (StatementCallback<Boolean>) stmt -> stmt.execute("CREATE DATABASE " + db));
        jdbcTemplate.execute((StatementCallback<Boolean>) stmt -> stmt
                        .execute("GRANT ALL PRIVILEGES ON DATABASE " + db + " TO " + username));

    private void runLiquibase(DataSource dataSource) throws LiquibaseException {
        SpringLiquibase liquibase = getSpringLiquibase(dataSource);

    protected SpringLiquibase getSpringLiquibase(DataSource dataSource) {
        SpringLiquibase liquibase = new SpringLiquibase();
        return liquibase;


Database Migrations

Just as before, we also need to take care of running the Database Migration on all existing Shards:

 * Based on MultiTenantSpringLiquibase, this class provides Liquibase support for
 * Sharding based on a dynamic collection of DataSources.
public class DynamicShardingMultiTenantSpringLiquibase implements InitializingBean, ResourceLoaderAware {

    private ShardRepository shardRepository;

    private LiquibaseProperties liquibaseProperties;

    private String username;

    private String password;

    private ResourceLoader resourceLoader;

    public void afterPropertiesSet() throws Exception {

    protected void runOnAllShards(Iterable<Shard> shards) {
        for(Shard shard : shards) {
  "Initializing Liquibase for shard " + shard.getDb());
            try (Connection connection = DriverManager.getConnection(shard.getUrl(), username, password)) {
                DataSource shardDataSource = new SingleConnectionDataSource(connection, false);
                SpringLiquibase liquibase = this.getSpringLiquibase(shardDataSource);
            } catch (SQLException | LiquibaseException e) {
                log.error("Failed to run Liquibase for shard " + shard.getDb(), e);
  "Liquibase ran for tenant " + shard.getDb());

    protected SpringLiquibase getSpringLiquibase(DataSource dataSource) {
        SpringLiquibase liquibase = new SpringLiquibase();
        return liquibase;


Updating the Externalized configuration

The externalized configuration now also must specify the limit of tenants per shard, as well as a template for the created Shard Url:

    database: blog
      url: jdbc:postgresql://localhost:5432/blog
      username: postgres
      password: secret
      changeLog: classpath:db/changelog/db.changelog-master.yaml  
    max-tenants: 2
      url-prefix: jdbc:postgresql://localhost:5432/
      enabled: true
      changeLog: classpath:db/changelog/db.changelog-shard.yaml

What have we achieved?

By combining the Shared Database with Discriminator Column pattern and Database per Shard, we can scale beyond one single relational database. This solution should be both robust and maximally scalable.

A fully working, minimalistic example can be found in the Github repository for this blog series, in the sharded_shared_database_postgres_rls branch.

Test-driving the solution

Let’s test-drive the solution! Start the tenant management application and the multi tenant service in separate terminal windows. Initially, no tenants exist, and hence no shards.

Now create a first tenant:

> curl -X POST "localhost:8088/tenants?tenantId=tenant1"

The logs from the tenant management application shows that a new Shard was created for the new tenant:

2022-03-25 18:11:35.462  INFO: Allocated tenant tenant1 to new shard jdbc:postgresql://localhost:5432/blog_shard_1

Now create a second tenant:

> curl -X POST "localhost:8088/tenants?tenantId=tenant2"

The logs shows that the new tenant was allocated to the same shard, which still had capacity:

2022-03-25 18:14:09.070  INFO: Allocated tenant tenant2 to shard jdbc:postgresql://localhost:5432/blog_shard_1

Since the max limit for a shard is set to 2 tenants, it is now full. Creating a third tenant will thus grow the Shard topology by opening up a new Shard:

> curl -X POST "localhost:8088/tenants?tenantId=tenant3"
2022-03-25 18:17:28.276  INFO: Allocated tenant tenant3 to new shard jdbc:postgresql://localhost:5432/blog_shard_2

Now insert some test data for different tenants, verifying that the correct Shard is selected for the tenants:

curl -H "X-TENANT-ID: tenant1" -H "Content-Type: application/" -X POST -d '{"name":"Product 1"}' localhost:8080/products
curl -H "X-TENANT-ID: tenant2" -H "Content-Type: application/" -X POST -d '{"name":"Product 2"}' localhost:8080/products
curl -H "X-TENANT-ID: tenant3" -H "Content-Type: application/" -X POST -d '{"name":"Product 3"}' localhost:8080/products

Then query for the data, and verify that the data is properly isolated between tenants:

curl -H "X-TENANT-ID: tenant1" localhost:8080/products
curl -H "X-TENANT-ID: tenant2" localhost:8080/products
curl -H "X-TENANT-ID: tenant3" localhost:8080/products

That concludes this blog series. Thanks for reading!


The following links have been very useful inspiration when preparing this material:

Tack för att du läser Callistas blogg.
Hjälp oss att nå ut med information genom att dela nyheter och artiklar i ditt nätverk.