Content uniformity tests process Implemention in C++ and Python codes. Using C++ to speed codes for 100 times.

Content uniformity (CU) is crucial for drug products. In the Chinese Pharmacopoeia(ChP) and the United States Pharmacopedia (USP), CU tests are similar but still have some differences. To find out the how the CU tests in ChP and USP (denoted as ChPCU and USPCU) differ under different circumstances, we implemented a simulation study based on Monte-Carlo. The results of the study can be found in the paper.

In this blog, I released the code of the Monte Carlo simulation process in both Python and C++. Just for myself to remember the code details.

## The Python version

### CUtest class

The core of the simulation experiement is the CU test. We used Monte-Carlo method to call the CU test repeatedly and then got the accuracy and other metrics of ChPCU and USPCU. Therefore, the CU test is wrapped into a class firstly.

# CUtest.py

# -*- coding: utf-8 -*-
import numpy as np

class ChPtest():
"""ChPtest provides Content Uniformity test in Chinese Pharmacopeia

"""

def __init__(self, s1, s2, k1=2.2, k2=0.25, k3=1.7, L=15):
"""input two (numpy) arrays, s1 and s2, as the sampling results
in the fist stage and the second stage.

Parameters
----------
s1 : array like
the sampling result in the first stage of content uniformity test
size should be equal to 10.
s2 : array like
the sampling result in the second stage of content uniformity test
size should be equal to 20.
k1 : float
parameter in ChP 0941
k2: float
parameter in ChP 0941
k3: float
parameter in ChP 0941
L: float
parameter in ChP 0941
"""

if len(s1) != 10:
raise ValueError('THe length of s1 should be 10')
elif len(s2) != 20:
raise ValueError('The length of s2 should be 20')
else:
self.stage1 = s1
self.__s2 = s2
self.k1 = k1
self.k2 = k2
self.k3 = k3
self.L = L
self.stage2 = np.concatenate([s1, s2])    # merge s1 and s2 into stage2
self.mean_s1 = np.mean(self.stage1)
# sd: degree of freedom = n-1, in accordance with ChP 0941
self.sd_s1 = np.std(self.stage1, ddof=1)    # unbiased estimator
self.mean_s2 = np.mean(self.stage2)
self.sd_s2 = np.std(self.stage2, ddof=1)

def result(self):
"""returns ChPtest result

Returns
-----------
[pass_s1, pass_s2, fail_s1, fail_s2]: numpy arrays
pass_s1: 0 or 1. If passes in stage 1 test, the value is 1.
pass_s2: 0 or 1. If passes in stage 2 test, the value is 1.
fail_s1: 0 or 1. If fails in stage 1 test, the value is 1.
fail_s2: 0 or 1. If fails in stage 2 test, the value is 1.

"""
# initialization with 0
pass_s1 = 0
fail_s1 = 0
pass_s2 = 0
fail_s2 = 0

# if passes stage 1 test, pass_1 add 1
# if fails stage 1 test, fail_1 add 1
# if keeps unclear, go to stage 2 test
Ave = abs(100 - self.mean_s1)
if Ave + self.k1 * self.sd_s1 <= self.L:    # if A + k1 * sd < L, pass
pass_s1 = 1
elif Ave + self.sd_s1 > self.L:    # if A + sd > L, fail
fail_s1 = 1

# stage 2 test
else:
Ave = abs(100 - self.mean_s2)    # recalculate Ave
if Ave <= self.k2 * self.L:    # if A <= k2 * L
# if A^2 + sd^2 <= k2 * L, pass
if Ave ** 2 + self.sd_s2 ** 2 <= self.k2 * self.L ** 2:
pass_s2 = 1
else:
fail_s2 = 1
else:
# if A > k2 * L and A + k3 * sd <= L, pass
if Ave + self.k3 * self.sd_s2 <= self.L:
pass_s2 = 1
else:
fail_s2 = 1
return np.array([pass_s1, pass_s2, fail_s1, fail_s2])

class USPtest():
"""USPtest provides Content Uniformity test in US Pharmacopeia.

"""
def __init__(self, s1, s2, k1=2.4, k2=2.0, L1=15, L2=25):
if len(s1) != 10:
raise ValueError('THe length of s1 should be 10')
elif len(s2) != 20:
raise ValueError('The length of s2 should be 20')
else:
self.stage1 = s1
self.__s2 = s2
self.stage2 = np.concatenate([s1, s2])    # merge s1 and s2 into stage2
self.k1 = k1
self.k2 = k2
self.L1 = L1
self.L2 = L2
self.mean_s1 = np.mean(self.stage1)
# sd: degree of freedom = n-1, in accordance with USP 905
self.sd_s1 = np.std(self.stage1, ddof=1)    # unbiased estimator
self.mean_s2 = np.mean(self.stage2)
self.sd_s2 = np.std(self.stage2, ddof=1)

def result(self):
"""returns the USPtest result

Returns
-----------
[pass_s1, pass_s2, fail_s1, fail_s2]: numpy arrays
pass_s1: 0 or 1. If passes in stage 1 test, the value is 1.
pass_s2: 0 or 1. If passes in stage 2 test, the value is 1.
fail_s1: 0 or 1. If fails in stage 1 test, the value is 1.
Indeed, the value of fail_s1 should always be 0 because USP 905 goes to
the stage 2 test directly instead of giving a result of fail
if it fails in the stage 1 test.

fail_s2: 0 or 1. If fails in stage 2 test, the value is 1.

"""
pass_s1 = 0
fail_s1 = 0    # no fail in stage 1 test indeed
pass_s2 = 0
fail_s2 = 0

# if passes stage 1 test, pass_s1 add 1
# if fails stage 1 test, go to stage 2 test
if self.mean_s1 < 98.5:
Ave = 98.5 - self.mean_s1
elif self.mean_s1 > 101.5:
Ave = self.mean_s1 - 101.5
else:
Ave = 0

# stage 1 test
if Ave + self.k1 * self.sd_s1 <= self.L1:    # if A + k1 * sd <= L1
pass_s1 = 1

# stage 2 test
else:
if self.mean_s2 < 98.5:
Ave2 = 98.5 - self.mean_s2
limit = 98.5
elif self.mean_s2 > 101.5:
Ave2 = self.mean_s2 - 101.5
limit = 101.5
else:
Ave2 = 0
limit = self.mean_s2

# Ave + k2 * sd <= L1 and all the tablets fall in the quality range
if ((Ave2 + self.k2 * self.sd_s2 <= self.L1) &
(np.all(self.stage2 >= (1 - 0.01 * self.L2) * limit)) &
(np.all(self.stage2 <= (1 + 0.01 * self.L2) * limit))):
pass_s2 = 1
else:
fail_s2 = 1

return np.array([pass_s1, pass_s2, fail_s1, fail_s2])


### CUsim methods

The Monte-Carlo experiments are implemented in the CUsim.py. Two methods are provided, sim_sd and sim_mu.

# CUsim.py

import numpy as np
from CUtest import ChPtest, USPtest

def sim_sd(mu, start, end, sd_num, sim_num=1000, n1=10, n2=20,
low_lvl=85, upp_lvl=115, ck1=2.2, ck2=0.25, ck3=1.7, cL=15,
uk1=2.4, uk2=2.0, uL1=15, uL2=25):
"""sim_sd provides a method to simulate CUtest with nomal distributions
which are under a given mean value and different standard deviations(sd).
sd starting from start and ending with end, are evenly spaced.

Parameters
----------
mu : float
The mean value of the normal distribution.
start : float
The starting standard deviation of the normal distribution.
end : float
The ending standard deviation of the normal distribution.
sd_num : int
Number of sd to generate.
sim_num : int, optional
Number of test simulations to generate. Default is 1000.
n1 : int, optional
Number of samples tested in the stage 1.
Default is 10, which complies with USP 905 and ChP 0941.
n2 : int, optional
Number of samples tested in the stage 2.
Default is 20, which complies with USP 905 and ChP 0941.
low_lvl : int, optional
The lower content bound of qualified tablets.
Default is 85, which means the content should not be less than 85%
of the label claim.
upp_lvl : int, optional
The upper content bound of qualified tablets.
Default is 115, which means the content should not be greater than 115%
of the label claim.
ck1 : float, optional
The parameter in CUtest.ChPtest(), default is 2.2.
ck2 : float, optional
The parameter in CUtest.ChPtest(), default is 0.25.
ck3 : float, optional
The parameter in CUtest.ChPtest(), default is 1.7.
cL : float, optional
The paramter in CUtest.ChPtest(), default is 15.
uk1 : float, optional
The parameter in CUtest.USPtest(), default is 2.4.
uk2 : float, optional
The parameter in CUtest.USPtest(), default is 2.0.
uL1 : float, optional
The parameter in CUtest.USPtest(), default is 15.
uL2 : float, optional
The parameter in CUtest.USPtest(), default is 25.

Returns
-------
Returns a list, containing failrate, chp_acc_s1, chp_acc, usp_acc_s1
and usp_acc

failrate: numpy array
The rate of unqualified tablets in each simulated batch.
The length eqauls to sd_num.
chp_acc_s1: numpy array
The stage 1 test accuracy of ChP 0941
chp_acc: numpy array
The test accuracy of ChP 0941.
usp_acc_s1: numpy array
The stage 1 test accuracy of USP 905.
usp_acc: numpy array
The test accuracy of USP 905.

Note
---------
In this function, accuray = (TP + TN) / (TP + FP + TN + FN)
"""

n_batch = 20000    # simulate 20k tablets in one batch
# initialization with zeros
failrate = np.zeros(sd_num)
chp_fail_s1 = np.zeros(sd_num)
chp_pass_s1 = np.zeros(sd_num)
chp_fail_s2 = np.zeros(sd_num)
chp_pass_s2 = np.zeros(sd_num)
usp_fail_s1 = np.zeros(sd_num)    # no fail in stage 1 in USP indeed
usp_pass_s1 = np.zeros(sd_num)
usp_fail_s2 = np.zeros(sd_num)
usp_pass_s2 = np.zeros(sd_num)
chp_acc_s1 = np.zeros(sd_num)
chp_acc = np.zeros(sd_num)
usp_acc_s1 = np.zeros(sd_num)
usp_acc = np.zeros(sd_num)

for i in range(sd_num):
# calc the sd in each loop
sd = (end - start) / (sd_num - 1) * i + start
# normal simulation
X = np.random.normal(loc=mu, scale=sd, size=n_batch)
failrate[i] = np.size(X[(X > upp_lvl) | (X < low_lvl)]) / n_batch

# initialization with zeros
# result =  np.array([pass_s1, pass_s2, fail_s1, fail_s2])
chp_result = np.array([0, 0, 0, 0])
usp_result = np.array([0, 0, 0, 0])

for j in range(sim_num):
# a faster version but the result accuracy may be slightly lower
# rng_idx = np.random.randint(0, 19999, size=30)
# s1 = X[rng_idx[:10]]
# s2 = X[rng_idx[10:]]

# a slower version but the result is definitely true
Xperm = np.random.permutation(X)  # fisrt randomization
s1 = Xperm[:n1]     # stage 1 sampling
Xperm2 = np.random.permutation(X[n1:])    # second randomization
s2 = Xperm2[:n2]    # stage 2 sampling

chp = ChPtest(s1=s1, s2=s2, k1=ck1, k2=ck2, k3=ck3, L=cL)
usp = USPtest(s1=s1, s2=s2, k1=uk1, k2=uk2, L1=uL1, L2=uL2)
chp_result = chp_result + chp.result()
usp_result = usp_result + usp.result()

chp_pass_s1[i] = chp_result / sim_num
chp_pass_s2[i] = chp_result / sim_num
chp_fail_s1[i] = chp_result / sim_num
chp_fail_s2[i] = chp_result / sim_num
usp_pass_s1[i] = usp_result / sim_num
usp_pass_s2[i] = usp_result / sim_num
usp_fail_s1[i] = usp_result / sim_num
usp_fail_s2[i] = usp_result / sim_num

if failrate[i] <= 0.05:
chp_acc_s1[i] = chp_pass_s1[i]
chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i]
usp_acc_s1[i] = usp_pass_s1[i]
usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i]
else:
chp_acc_s1[i] = chp_fail_s1[i]
chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i]
# usp_acc_s1[i] = usp_fail_s1[i]
usp_acc[i] = usp_fail_s2[i]

return [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc]

def sim_mu(sd, start, end, mu_num, sim_num=1000, n1=10, n2=20,
low_lvl=85, upp_lvl=115, ck1=2.2, ck2=0.25, ck3=1.7, cL=15,
uk1=2.4, uk2=2.0, uL1=15, uL2=25):
"""sim_mu provides a method to simulate CUtest with nomal distributions
which are under a given standard deviation and different mean values(mu).
mu starting from start and ending with end, are evenly spaced.

Parameters
----------
sd : float
The standard deivation of the normal distribution.
start : float
The starting standard mean value of the normal distribution.
end : float
The ending mean value of the normal distribution.
mu_num : int
Number of mu to generate.
sim_num : int, optional
Number of test simulations to generate. Default is 1000.
n1 : int, optional
Number of samples tested in the stage 1.
Default is 10, which complies with USP 905 and ChP 0941.
n2 : int, optional
Number of samples tested in the stage 2.
Default is 20, which complies with USP 905 and ChP 0941.
low_lvl : int, optional
The lower content bound of qualified tablets.
Default is 85, which means the content should not be less than 85%
of the label claim.
upp_lvl : int, optional
The upper content bound of qualified tablets.
Default is 115, which means the content should not be greater than 115%
of the label claim.
ck1 : float, optional
The parameter in CUtest.ChPtest(), default is 2.2.
ck2 : float, optional
The parameter in CUtest.ChPtest(), default is 0.25.
ck3 : float, optional
The parameter in CUtest.ChPtest(), default is 1.7.
cL : float, optional
The paramter in CUtest.ChPtest(), default is 15.
uk1 : float, optional
The parameter in CUtest.USPtest(), default is 2.4.
uk2 : float, optional
The parameter in CUtest.USPtest(), default is 2.0.
uL1 : float, optional
The parameter in CUtest.USPtest(), default is 15.
uL2 : float, optional
The parameter in CUtest.USPtest(), default is 25.

Returns
-------
Returns a list, containing failrate, chp_acc_s1, chp_acc, usp_acc_s1
and usp_acc

failrate: numpy array
The rate of unqualified tablets in each simulated batch.
The length eqauls to mu_num.
chp_acc_s1: numpy array
The stage 1 test accuracy of ChP 0941
The length eqauls to mu_num.
chp_acc: numpy array
The test accuracy of ChP 0941.
The length eqauls to mu_num.
usp_acc_s1: numpy array
The stage 1 test accuracy of USP 905.
The length eqauls to mu_num.
usp_acc: numpy array
The test accuracy of USP 905.

Note
---------
In this function, accuray = (TP + TN) / (TP + FP + TN + FN)
"""

n_batch = 20000    # simulate 20k tablets in one batch
# initialization with zeros
failrate = np.zeros(mu_num)
chp_fail_s1 = np.zeros(mu_num)
chp_pass_s1 = np.zeros(mu_num)
chp_fail_s2 = np.zeros(mu_num)
chp_pass_s2 = np.zeros(mu_num)
usp_fail_s1 = np.zeros(mu_num)    # no fail in stage 1 in USP indded
usp_pass_s1 = np.zeros(mu_num)
usp_fail_s2 = np.zeros(mu_num)
usp_pass_s2 = np.zeros(mu_num)
chp_acc_s1 = np.zeros(mu_num)
chp_acc = np.zeros(mu_num)
usp_acc_s1 = np.zeros(mu_num)
usp_acc = np.zeros(mu_num)

for i in range(mu_num):
# calc the sd in each loop
mu = (end - start) / (mu_num - 1) * i + start
# normal simulation
X = np.random.normal(loc=mu, scale=sd, size=n_batch)
failrate[i] = np.size(X[(X > upp_lvl) | (X < low_lvl)]) / n_batch

# initialization with zeros
# result =  np.array([pass_s1, pass_s2, fail_s1, fail_s2])
chp_result = np.array([0, 0, 0, 0])
usp_result = np.array([0, 0, 0, 0])

for j in range(sim_num):
Xperm = np.random.permutation(X)  # fisrt randomization
s1 = Xperm[:n1]     # stage 1 sampling
Xperm2 = np.random.permutation(X[n1:])    # second randomization
s2 = Xperm2[:n2]    # stage 2 sampling

chp = ChPtest(s1=s1, s2=s2, k1=ck1, k2=ck2, k3=ck3, L=cL)
usp = USPtest(s1=s1, s2=s2, k1=uk1, k2=uk2, L1=uL1, L2=uL2)
chp_result = chp_result + chp.result()
usp_result = usp_result + usp.result()

chp_pass_s1[i] = chp_result / sim_num
chp_pass_s2[i] = chp_result / sim_num
chp_fail_s1[i] = chp_result / sim_num
chp_fail_s2[i] = chp_result / sim_num
usp_pass_s1[i] = usp_result / sim_num
usp_pass_s2[i] = usp_result / sim_num
usp_fail_s1[i] = usp_result / sim_num
usp_fail_s2[i] = usp_result / sim_num

if failrate[i] <= 0.05:
chp_acc_s1[i] = chp_pass_s1[i]
chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i]
usp_acc_s1[i] = usp_pass_s1[i]
usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i]
else:
chp_acc_s1[i] = chp_fail_s1[i]
chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i]
# usp_acc_s1[i] = usp_fail_s1[i]
usp_acc[i] = usp_fail_s2[i]

return [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc]


Mind that the 20k samples are generated using normal distributions, and 10/30 samples are drawn from the 20k samples. There are 2 ways to implement the sampling procedure. The one (used in this article) is to shuffle all the 20k samples randomly, and then drawing the first 10/30 samples. The other (shown in the sim_sd function) is randomly generating 10/30 integers smaller than 20k as the indices, and drawing the samples using the indices. The latter method is much faster, but chances are that one sample might be drawn 2 or more times, which is against the real procedure. However, given the population size is 20k, such situation hardly happens.

## C++ version

The Python version costs a lot of time. The slowest part is to shuffle the 20k samples. But if we choose the latter way to draw samples, the time is not totally satisfactory indeed. Because the code is doing a lot of loops, we may use Numba or Cython to speed the code. Another way or ultimate way is using C++ to cover the core code.

In this post, Eigen is used to replace Numpy to generate random numbers.

### CUtest class

// CUTest.hpp

#pragma once
#include <Eigen/Dense>
#include <EigenRand/EigenRand>
#include <chrono>
#include <cmath>
#include <random>
#include <iostream>

/// @brief standard deviation with 1 degree of freedom (unbiased)
/// @param array an Eigen array
/// @param ddof the degree of freedom, default is 1
/// @return the standard deviation of the array
double std_ddof(Eigen::ArrayXd array, int ddof);

class ChPTest {
public:
// members
Eigen::Array<double, 10, 1> s1;
Eigen::Array<double, 20, 1> s2;
Eigen::Array<double, 30, 1> stage_2;
double k1{ 2.2 };
double k2{ 0.5 };
double k3{ 1.7 };
double L{ 15 };
double std_s1;
double std_stage2;
double mean_s1;
double mean_stage2;

Eigen::Array4d get_result();

// constructor function
ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in);
ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, const double k1_in, const double k2_in, const double k3_in, const double L_in);
~ChPTest();
};

class USPTest {
public:
Eigen::Array<double, 10, 1> s1;
Eigen::Array<double, 20, 1> s2;
Eigen::Array<double, 30, 1> stage_2;
double k1{ 2.4 };
double k2{ 2.0 };
double L1{ 15.0 };
double L2{ 25.0 };
double std_s1;
double std_stage2;
double mean_s1;
double mean_stage2;

// constructor function
USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in);
USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, double k1_in, double k2_in, double L1_in, double L2_in);
~USPTest();
// member function
Eigen::Array4d get_result();
};

// CUTest.cpp
#include "CUTest.hpp"

// standard deviation with 1 degree of freedom (unbiased)
double std_ddof(Eigen::ArrayXd array, int ddof=1) {
double standard_deviation = std::sqrt((array - array.mean()).square().sum() / (array.size() - ddof));
return standard_deviation;
}

ChPTest::ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in) {
// check length error
if (s1_in.size() != 10) {
throw(std::runtime_error("The length of s1 must be 10!"));
}
if (s2_in.size() != 20) {
throw(std::runtime_error("The length of s1 must be 10!"));
}
s1 = s1_in;
s2 = s2_in;
stage_2 << s1, s2;
mean_s1 = s1.mean();
std_s1 = std_ddof(s1, 1);
mean_stage2 = stage_2.mean();
std_stage2 = std_ddof(stage_2, 1);
}

ChPTest::ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, const double k1_in, const double k2_in, const double k3_in, const double L_in) {
// check length error
if (s1_in.size() != 10) {
throw(std::runtime_error("The length of s1 must be 10!"));
}
if (s2_in.size() != 20) {
throw(std::runtime_error("The legnth of s2 must be 20!"));
}

s1 = s1_in;
s2 = s2_in;
k1 = k1_in;
k2 = k2_in;
k3 = k3_in;
L = L_in;
stage_2 << s1, s2;
mean_s1 = s1.mean();
std_s1 = std_ddof(s1, 1);
mean_stage2 = stage_2.mean();
std_stage2 = std_ddof(stage_2, 1);
}

ChPTest::~ChPTest() { }

//defintion of member function get_result
Eigen::Array4d ChPTest::get_result() {
int pass_s1 = 0;
int pass_s2 = 0;
int fail_s1 = 0;
int fail_s2 = 0;

double Ave = std::abs(100 - mean_s1);
if (Ave + k1 * std_s1 <= L) {
pass_s1 = 1;
}
else if (Ave + std_s1 > L) {
fail_s1 = 1;
}
else {
Ave = std::abs(100 - mean_stage2);
if (Ave <= k2 * L) {
if (pow(Ave, 2) + pow(std_stage2, 2) <= k2 * pow(L, 2)) {
pass_s2 = 1;
}
else {
fail_s2 = 1;
}
}
else
{
if (Ave + k3 * std_stage2 <= L) {
pass_s2 = 1;
}
else {
fail_s2 = 1;
}
}
}
Eigen::Array4d result;
result << pass_s1, pass_s2, fail_s1, fail_s2;
return result;
}

USPTest::USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in) {
// check length error
if (s1_in.size() != 10) {
throw("The length of s1 must be 10!");
}
if (s2_in.size() != 20) {
throw("The legnth of s2 must be 20!");
}

s1 = s1_in;
s2 = s2_in;
stage_2 << s1, s2;
mean_s1 = s1.mean();
std_s1 = std_ddof(s1, 1);
mean_stage2 = stage_2.mean();
std_stage2 = std_ddof(stage_2, 1);
}

USPTest::USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, double k1_in, double k2_in, double L1_in, double L2_in)
{
// check length error
if (s1_in.size() != 10) {
throw("The length of s1 must be 10!");
}
if (s2_in.size() != 20) {
throw("The legnth of s2 must be 20!");
}

s1 = s1_in;
s2 = s2_in;
k1 = k1_in;
k2 = k2_in;
L1 = L1_in;
L2 = L2_in;
stage_2 << s1, s2;
mean_s1 = s1.mean();
std_s1 = std_ddof(s1, 1);
mean_stage2 = stage_2.mean();
std_stage2 = std_ddof(stage_2, 1);
}

USPTest::~USPTest() {}

// return Eigen::Array4d [pass_s1, pass_s2, fail_s1, fail_s2]
Eigen::Array4d USPTest::get_result() {
int pass_s1 = 0;
int pass_s2 = 0;
int fail_s1 = 0;  //no fail in stage 1, indeed
int fail_s2 = 0;

double Ave = 0;
double Ave2 = 0;
double limit = 0;
if (mean_s1 < 98.5) {
Ave = 98.5 - mean_s1;
}
else if (mean_s1 > 101.5) {
Ave = mean_s1 - 101.5;
}
else {
Ave = 0;
}

// stage 1 test
if (Ave + k1 * std_s1 <= L1) {
pass_s1 = 1;
}
else {
if (mean_stage2 < 98.5) {
Ave2 = 98.5 - mean_stage2;
limit = 98.5;
}
else if (mean_stage2 > 101.5) {
Ave2 = mean_stage2 - 101.5;
limit = 101.5;
}
else {
Ave2 = 0;
limit = mean_stage2;
}

if ((Ave2 + k2 * std_stage2 <= L1) && (stage_2 >= (1 - 0.01 * L2) * limit).all() && (stage_2 <= (1 + 0.01 * L2) * limit).all()) {
pass_s2 = 1;
}
else {
fail_s2 = 1;
}
}
Eigen::Array4d result;
result << pass_s1, pass_s2, fail_s1, fail_s2;
return result;
}


### CUsim methods

Since the code is mainly for-loop, we can use openmp to speed it.

// CUSim.hpp

#pragma once
#include "CUTest.hpp"
/// @brief simulate CUtest under different standard deviation values and a fixed mean value
/// @param mu mean value of the normal distribution
/// @param start the start sd
/// @param end the end sd
/// @param mu_num the nunmber of sd values to be generated, defalt is 101
/// @param sim_num the simulation number, default is 10000
/// @param low_lvl the lower level of qualified tablets
/// @param upp_lvl the upper level of qualified tablets
/// @param ck1 the value of k1 in ChPtest
/// @param ck2 the value of k2 in ChPtest
/// @param ck3 the value of k3 in ChPtest
/// @param cL the value of L in ChPtest
/// @param uk1 the value of k1 in USPtest
/// @param uk2 the value of k2 in USPtest
/// @param uL1 the value of L1 in USPtest
/// @param uL2 the value of L2 in USPtest
/// @return vector of [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2]
std::vector<Eigen::ArrayXd> sim_sd(double mu, double start, double end, int number_of_threads=8, int sd_num = 61, int sim_num = 10000, double low_lvl = 85.0, double upp_lvl = 115.0,
double ck1 = 2.2, double ck2 = 0.25, double ck3 = 1.7, double cL = 15.0, double uk1 = 2.4, double uk2 = 2.0, double uL1 = 15.0, double uL2 = 25.0);

/// @brief simulate CUtest under different mean values and a fixed standard deviation
/// @param mu standard deviation of the normal distribution
/// @param start the start mu
/// @param end the end mu
/// @param mu_num the nunmber of mu values to be generated, defalt is 101
/// @param sim_num the simulation number, default is 10000
/// @param low_lvl the lower level of qualified tablets
/// @param upp_lvl the upper level of qualified tablets
/// @param ck1 the value of k1 in ChPtest
/// @param ck2 the value of k2 in ChPtest
/// @param ck3 the value of k3 in ChPtest
/// @param cL the value of L in ChPtest
/// @param uk1 the value of k1 in USPtest
/// @param uk2 the value of k2 in USPtest
/// @param uL1 the value of L1 in USPtest
/// @param uL2 the value of L2 in USPtest
/// @return vector of [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2]
std::vector<Eigen::ArrayXd> sim_mu(double sd, double start, double end, int number_of_threads = 8, int mu_num=101, int sim_num=10000, double low_lvl=85.0, double upp_lvl=115.0,
double ck1 = 2.2, double ck2 = 0.25, double ck3 = 1.7, double cL = 15.0, double uk1 = 2.4, double uk2 = 2.0, double uL1 = 15.0, double uL2 = 25.0);

// CUSim.cpp
#include "CUTest.hpp"
#include <omp.h>

std::vector<Eigen::ArrayXd> sim_sd(double mu, double start, double end, int number_of_threads, int sd_num, int sim_num, double low_lvl, double upp_lvl,
double ck1, double ck2, double ck3, double cL, double uk1, double uk2, double uL1, double uL2) {
int batch = 20000;    // simulate 20k tablets in one batch
Eigen::ArrayXd failrate = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_fail_s1 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_fail_s2 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_pass_s1 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_pass_s2 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd usp_fail_s1 = Eigen::ArrayXd::Zero(sd_num);    // no fail in stage 1 test indeed
Eigen::ArrayXd usp_fail_s2 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd usp_pass_s1 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd usp_pass_s2 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_acc_s1 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd chp_acc = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd usp_acc_s1 = Eigen::ArrayXd::Zero(sd_num);
Eigen::ArrayXd usp_acc = Eigen::ArrayXd::Zero(sd_num);

std::random_device device;
for (int i = 0; i < sd_num; i++) {
double sd = start + (end - start) * i / double(sd_num - 1);

// normal distribution

Eigen::Rand::Vmt19937_64 urng{ device() };
Eigen::Rand::NormalGen<double> norm_gen{mu, sd};
Eigen::ArrayXd X = norm_gen.generate<Eigen::ArrayXd>(batch, 1, urng);

failrate[i] = double((X > upp_lvl || X < low_lvl).count()) / batch;

//result = np.array([pass_s1, pass_s2, fail_s1, fail_s2])
Eigen::Array4d chp_result = Eigen::Array4d::Zero(4);
Eigen::Array4d usp_result = Eigen::Array4d::Zero(4);

Eigen::Rand::UniformIntGen<int> uni_int_gen{0, 19999};
Eigen::Rand::Vmt19937_64 urng_uni_int{device()};
std::mt19937 g(device());
for (int j = 0; j < sim_num; ++j) {
// a fast simulation method: generating 100 numbers between 0 and 19999
// then choose the unique elements from the 100 numbers
// use the first 10 unique numbers as the s1 indice
// use the 11th to 30th unique numbers as the s2 indice
Eigen::ArrayXi rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(100, 1, urng_uni_int);
std::set<int> unique_idx{ rng_idx.begin(), rng_idx.end() };
Eigen::ArrayXi unique_idx_30(30, 1);
int count_elem = 0;
for (int elem_unique : unique_idx) {
unique_idx_30(count_elem) = elem_unique;
count_elem += 1;
if (count_elem == 30) {
break;
}
}
Eigen::ArrayXd s1 = X(Eigen::seq(0, 9));
Eigen::ArrayXd s2 = X(Eigen::seq(10, 29));
ChPTest chp = ChPTest(s1, s2, ck1, ck2, ck3, cL);
USPTest usp = USPTest(s1, s2, uk1, uk2, uL1, uL2);

chp_result += chp.get_result();
usp_result += usp.get_result();
}

chp_pass_s1[i] = chp_result / double(sim_num);
chp_pass_s2[i] = chp_result / double(sim_num);
chp_fail_s1[i] = chp_result / double(sim_num);
chp_fail_s2[i] = chp_result / double(sim_num);
usp_pass_s1[i] = usp_result / double(sim_num);
usp_pass_s2[i] = usp_result / double(sim_num);
usp_fail_s1[i] = usp_result / double(sim_num);
usp_fail_s2[i] = usp_result / double(sim_num);

if (failrate[i] <= 0.05) {
chp_acc_s1[i] = chp_pass_s1[i];
chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i];
usp_acc_s1[i] = usp_pass_s1[i];
usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i];
}
else {
chp_acc_s1[i] = chp_fail_s1[i];
chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i];
usp_acc[i] = usp_fail_s2[i];
}
}
std::vector<Eigen::ArrayXd> result{ failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2};
return result;
}

std::vector<Eigen::ArrayXd> sim_mu(double sd, double start, double end, int number_of_threads, int mu_num, int sim_num, double low_lvl, double upp_lvl,
double ck1, double ck2, double ck3, double cL, double uk1, double uk2, double uL1, double uL2) {
int batch = 20000;    // simulate 20k tablets in one batch
Eigen::ArrayXd failrate = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_fail_s1 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_fail_s2 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_pass_s1 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_pass_s2 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd usp_fail_s1 = Eigen::ArrayXd::Zero(mu_num);    // no fail in stage 1 test indeed
Eigen::ArrayXd usp_fail_s2 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd usp_pass_s1 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd usp_pass_s2 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_acc_s1 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd chp_acc = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd usp_acc_s1 = Eigen::ArrayXd::Zero(mu_num);
Eigen::ArrayXd usp_acc = Eigen::ArrayXd::Zero(mu_num);
std::random_device device;
for (int i = 0; i < mu_num; i++) {
double mu = start + (end - start) * i / double(mu_num - 1);

// normal distribution

Eigen::Rand::Vmt19937_64 urng{ device() };
Eigen::Rand::NormalGen<double> norm_gen{ mu, sd };
Eigen::ArrayXd X = norm_gen.generate<Eigen::ArrayXd>(batch, 1, urng);

failrate[i] = double((X > upp_lvl || X < low_lvl).count()) / batch;

//result = np.array([pass_s1, pass_s2, fail_s1, fail_s2])
Eigen::Array4d chp_result = Eigen::Array4d::Zero(4);
Eigen::Array4d usp_result = Eigen::Array4d::Zero(4);

// random draw 30 samples, first 10 as s1, last 20 as s2
//Eigen::Rand::UniformIntGen<int> uni_int_gen{ 0, 19999 };
//Eigen::Rand::Vmt19937_64 urng_uni_int{ device() };
//Eigen::ArrayXi rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(30, 1, urng_uni_int);
std::mt19937 g(device());

for (int j = 0; j < sim_num; ++j) {
//rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(30, 1, urng_uni_int);
//Eigen::ArrayXi idx_s1 = rng_idx(Eigen::seq(0, 9));
//Eigen::ArrayXi idx_s2 = rng_idx(Eigen::seq(10, 29));
//Eigen::Array<double, 10, 1> s1 = X(idx_s1);
//Eigen::Array<double, 20, 1> s2 = X(idx_s2);

std::shuffle(X.begin(), X.end(), g);
Eigen::ArrayXd s1 = X(Eigen::seq(0, 9));
Eigen::ArrayXd s2 = X(Eigen::seq(10, 29));
ChPTest chp = ChPTest(s1, s2, ck1, ck2, ck3, cL);
USPTest usp = USPTest(s1, s2, uk1, uk2, uL1, uL2);
chp_result += chp.get_result();
usp_result += usp.get_result();
}

chp_pass_s1[i] = chp_result / double(sim_num);
chp_pass_s2[i] = chp_result / double(sim_num);
chp_fail_s1[i] = chp_result / double(sim_num);
chp_fail_s2[i] = chp_result / double(sim_num);
usp_pass_s1[i] = usp_result / double(sim_num);
usp_pass_s2[i] = usp_result / double(sim_num);
usp_fail_s1[i] = usp_result / double(sim_num);
usp_fail_s2[i] = usp_result / double(sim_num);

if (failrate[i] <= 0.05) {
chp_acc_s1[i] = chp_pass_s1[i];
chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i];
usp_acc_s1[i] = usp_pass_s1[i];
usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i];
}
else {
chp_acc_s1[i] = chp_fail_s1[i];
chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i];
usp_acc[i] = usp_fail_s2[i];
}
}
std::vector<Eigen::ArrayXd> result{ failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2 };
return result;
}


### Binding

We have used C++ to write the core part of the experiments and we now want to call the C++ code in Python. Pybind11 provides many conversions between Python and C++, which helps to bind the two languages easily.

//bind.cpp
#include <pybind11/pybind11.h>
#include <pybind11/eigen.h>
#include <pybind11/stl.h>
#include "CUSim.hpp"
#include "CUSim_fast.hpp"

namespace py = pybind11;

PYBIND11_MODULE(cusim, m) {
m.def("sim_sd", &sim_sd, "CUSim",
py::arg("mu") = 100, py::arg("start") = 3, py::arg("end") = 15, py::arg("number_of_threads") = 8,
py::arg("sd_num") = 61, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
m.def("sim_mu", &sim_mu, "simulate mu",
py::arg("sd") = 10, py::arg("start") = 90, py::arg("end") = 110, py::arg("number_of_threads") = 8,
py::arg("mu_num") = 101, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
m.def("sim_sd_fast", &sim_sd_fast, "simulate sd",
py::arg("mu") = 100, py::arg("start") = 3, py::arg("end") = 15, py::arg("number_of_threads") = 8,
py::arg("sd_num") = 61, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
m.def("sim_mu_fast", &sim_mu_fast, "simulate mu",
py::arg("sd") = 10, py::arg("start") = 90, py::arg("end") = 110, py::arg("number_of_threads") = 8,
py::arg("mu_num") = 101, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
}


1. Comparison Study of the Content Uniformity Tests in Chinese Pharmacopoeia and United States Pharmacopeia Based on Monte Carlo Simulation[J].Chin J Mod Appl Pharm, 2019, 36(19):2405-2410.DOI:10.13748/j.cnki.issn1007-7693.2019.19.008. ↩︎