Content uniformity tests process Implemention in C++ and Python codes. Using C++ to speed codes for 100 times.

Content uniformity (CU) is crucial for drug products. In the Chinese Pharmacopoeia(ChP) and the United States Pharmacopedia (USP), CU tests are similar but still have some differences. To find out the how the CU tests in ChP and USP (denoted as ChPCU and USPCU) differ under different circumstances, we implemented a simulation study based on Monte-Carlo. The results of the study can be found in the paper[1].

In this blog, I released the code of the Monte Carlo simulation process in both Python and C++. Just for myself to remember the code details.

The Python version

CUtest class

The core of the simulation experiement is the CU test. We used Monte-Carlo method to call the CU test repeatedly and then got the accuracy and other metrics of ChPCU and USPCU. Therefore, the CU test is wrapped into a class firstly.

# CUtest.py

# -*- coding: utf-8 -*-
import numpy as np


class ChPtest():
    """ChPtest provides Content Uniformity test in Chinese Pharmacopeia

    """

    def __init__(self, s1, s2, k1=2.2, k2=0.25, k3=1.7, L=15):
        """input two (numpy) arrays, s1 and s2, as the sampling results
        in the fist stage and the second stage.

        Parameters
        ----------
        s1 : array like
            the sampling result in the first stage of content uniformity test
            size should be equal to 10.
        s2 : array like
            the sampling result in the second stage of content uniformity test
            size should be equal to 20.
        k1 : float
            parameter in ChP 0941
        k2: float
            parameter in ChP 0941
        k3: float
            parameter in ChP 0941
        L: float
            parameter in ChP 0941
        """

        if len(s1) != 10:
            raise ValueError('THe length of s1 should be 10')
        elif len(s2) != 20:
            raise ValueError('The length of s2 should be 20')
        else:
            self.stage1 = s1
            self.__s2 = s2
            self.k1 = k1
            self.k2 = k2
            self.k3 = k3
            self.L = L
            self.stage2 = np.concatenate([s1, s2])    # merge s1 and s2 into stage2
            self.mean_s1 = np.mean(self.stage1)
            # sd: degree of freedom = n-1, in accordance with ChP 0941
            self.sd_s1 = np.std(self.stage1, ddof=1)    # unbiased estimator
            self.mean_s2 = np.mean(self.stage2)
            self.sd_s2 = np.std(self.stage2, ddof=1)

    def result(self):
        """returns ChPtest result

        Returns
        -----------
        [pass_s1, pass_s2, fail_s1, fail_s2]: numpy arrays
            pass_s1: 0 or 1. If passes in stage 1 test, the value is 1.
            pass_s2: 0 or 1. If passes in stage 2 test, the value is 1.
            fail_s1: 0 or 1. If fails in stage 1 test, the value is 1.
            fail_s2: 0 or 1. If fails in stage 2 test, the value is 1.

        """
        # initialization with 0
        pass_s1 = 0
        fail_s1 = 0
        pass_s2 = 0
        fail_s2 = 0

        # if passes stage 1 test, pass_1 add 1
        # if fails stage 1 test, fail_1 add 1
        # if keeps unclear, go to stage 2 test
        Ave = abs(100 - self.mean_s1)
        if Ave + self.k1 * self.sd_s1 <= self.L:    # if A + k1 * sd < L, pass
            pass_s1 = 1
        elif Ave + self.sd_s1 > self.L:    # if A + sd > L, fail
            fail_s1 = 1

        # stage 2 test
        else:
            Ave = abs(100 - self.mean_s2)    # recalculate Ave
            if Ave <= self.k2 * self.L:    # if A <= k2 * L
                # if A^2 + sd^2 <= k2 * L, pass
                if Ave ** 2 + self.sd_s2 ** 2 <= self.k2 * self.L ** 2:
                    pass_s2 = 1
                else:
                    fail_s2 = 1
            else:
                # if A > k2 * L and A + k3 * sd <= L, pass
                if Ave + self.k3 * self.sd_s2 <= self.L:
                    pass_s2 = 1
                else:
                    fail_s2 = 1
        return np.array([pass_s1, pass_s2, fail_s1, fail_s2])


class USPtest():
    """USPtest provides Content Uniformity test in US Pharmacopeia.

    """
    def __init__(self, s1, s2, k1=2.4, k2=2.0, L1=15, L2=25):
        if len(s1) != 10:
            raise ValueError('THe length of s1 should be 10')
        elif len(s2) != 20:
            raise ValueError('The length of s2 should be 20')
        else:
            self.stage1 = s1
            self.__s2 = s2
            self.stage2 = np.concatenate([s1, s2])    # merge s1 and s2 into stage2
            self.k1 = k1
            self.k2 = k2
            self.L1 = L1
            self.L2 = L2
            self.mean_s1 = np.mean(self.stage1)
            # sd: degree of freedom = n-1, in accordance with USP 905
            self.sd_s1 = np.std(self.stage1, ddof=1)    # unbiased estimator
            self.mean_s2 = np.mean(self.stage2)
            self.sd_s2 = np.std(self.stage2, ddof=1)

    def result(self):
        """returns the USPtest result

        Returns
        -----------
        [pass_s1, pass_s2, fail_s1, fail_s2]: numpy arrays
            pass_s1: 0 or 1. If passes in stage 1 test, the value is 1.
            pass_s2: 0 or 1. If passes in stage 2 test, the value is 1.
            fail_s1: 0 or 1. If fails in stage 1 test, the value is 1.
        Indeed, the value of fail_s1 should always be 0 because USP 905 goes to
        the stage 2 test directly instead of giving a result of fail
        if it fails in the stage 1 test.

            fail_s2: 0 or 1. If fails in stage 2 test, the value is 1.

        """
        pass_s1 = 0
        fail_s1 = 0    # no fail in stage 1 test indeed
        pass_s2 = 0
        fail_s2 = 0

        # if passes stage 1 test, pass_s1 add 1
        # if fails stage 1 test, go to stage 2 test
        if self.mean_s1 < 98.5:
            Ave = 98.5 - self.mean_s1
        elif self.mean_s1 > 101.5:
            Ave = self.mean_s1 - 101.5
        else:
            Ave = 0

        # stage 1 test
        if Ave + self.k1 * self.sd_s1 <= self.L1:    # if A + k1 * sd <= L1
            pass_s1 = 1

        # stage 2 test
        else:
            if self.mean_s2 < 98.5:
                Ave2 = 98.5 - self.mean_s2
                limit = 98.5
            elif self.mean_s2 > 101.5:
                Ave2 = self.mean_s2 - 101.5
                limit = 101.5
            else:
                Ave2 = 0
                limit = self.mean_s2

            # Ave + k2 * sd <= L1 and all the tablets fall in the quality range
            if ((Ave2 + self.k2 * self.sd_s2 <= self.L1) &
               (np.all(self.stage2 >= (1 - 0.01 * self.L2) * limit)) &
               (np.all(self.stage2 <= (1 + 0.01 * self.L2) * limit))):
                pass_s2 = 1
            else:
                fail_s2 = 1

        return np.array([pass_s1, pass_s2, fail_s1, fail_s2])

CUsim methods

The Monte-Carlo experiments are implemented in the CUsim.py. Two methods are provided, sim_sd and sim_mu.

# CUsim.py

import numpy as np
from CUtest import ChPtest, USPtest


def sim_sd(mu, start, end, sd_num, sim_num=1000, n1=10, n2=20,
           low_lvl=85, upp_lvl=115, ck1=2.2, ck2=0.25, ck3=1.7, cL=15,
           uk1=2.4, uk2=2.0, uL1=15, uL2=25):
    """sim_sd provides a method to simulate CUtest with nomal distributions
    which are under a given mean value and different standard deviations(`sd`).
    `sd` starting from `start` and ending with `end`, are evenly spaced.

    Parameters
    ----------
    mu : float
        The mean value of the normal distribution.
    start : float
        The starting standard deviation of the normal distribution.
    end : float
        The ending standard deviation of the normal distribution.
    sd_num : int
        Number of sd to generate.
    sim_num : int, optional
        Number of test simulations to generate. Default is 1000.
    n1 : int, optional
        Number of samples tested in the stage 1.
        Default is 10, which complies with USP 905 and ChP 0941.
    n2 : int, optional
        Number of samples tested in the stage 2.
        Default is 20, which complies with USP 905 and ChP 0941.
    low_lvl : int, optional
        The lower content bound of qualified tablets.
        Default is 85, which means the content should not be less than 85%
        of the label claim.
    upp_lvl : int, optional
        The upper content bound of qualified tablets.
        Default is 115, which means the content should not be greater than 115%
        of the label claim.
    ck1 : float, optional
        The parameter in CUtest.ChPtest(), default is 2.2.
    ck2 : float, optional
        The parameter in CUtest.ChPtest(), default is 0.25.
    ck3 : float, optional
        The parameter in CUtest.ChPtest(), default is 1.7.
    cL : float, optional
        The paramter in CUtest.ChPtest(), default is 15.
    uk1 : float, optional
        The parameter in CUtest.USPtest(), default is 2.4.
    uk2 : float, optional
        The parameter in CUtest.USPtest(), default is 2.0.
    uL1 : float, optional
        The parameter in CUtest.USPtest(), default is 15.
    uL2 : float, optional
        The parameter in CUtest.USPtest(), default is 25.

    Returns
    -------
    Returns a list, containing `failrate`, `chp_acc_s1`, `chp_acc`, `usp_acc_s1`
    and `usp_acc`

    failrate: numpy array
        The rate of unqualified tablets in each simulated batch.
        The length eqauls to `sd_num`.
    chp_acc_s1: numpy array
        The stage 1 test accuracy of ChP 0941
    chp_acc: numpy array
        The test accuracy of ChP 0941.
    usp_acc_s1: numpy array
        The stage 1 test accuracy of USP 905.
    usp_acc: numpy array
        The test accuracy of USP 905.

    Note
    ---------
    In this function, accuray = (TP + TN) / (TP + FP + TN + FN)
    """

    n_batch = 20000    # simulate 20k tablets in one batch
    # initialization with zeros
    failrate = np.zeros(sd_num)
    chp_fail_s1 = np.zeros(sd_num)
    chp_pass_s1 = np.zeros(sd_num)
    chp_fail_s2 = np.zeros(sd_num)
    chp_pass_s2 = np.zeros(sd_num)
    usp_fail_s1 = np.zeros(sd_num)    # no fail in stage 1 in USP indeed
    usp_pass_s1 = np.zeros(sd_num)
    usp_fail_s2 = np.zeros(sd_num)
    usp_pass_s2 = np.zeros(sd_num)
    chp_acc_s1 = np.zeros(sd_num)
    chp_acc = np.zeros(sd_num)
    usp_acc_s1 = np.zeros(sd_num)
    usp_acc = np.zeros(sd_num)

    for i in range(sd_num):
        # calc the sd in each loop
        sd = (end - start) / (sd_num - 1) * i + start
        # normal simulation
        X = np.random.normal(loc=mu, scale=sd, size=n_batch)
        failrate[i] = np.size(X[(X > upp_lvl) | (X < low_lvl)]) / n_batch

        # initialization with zeros
        # result =  np.array([pass_s1, pass_s2, fail_s1, fail_s2])
        chp_result = np.array([0, 0, 0, 0])
        usp_result = np.array([0, 0, 0, 0])

        for j in range(sim_num):
            # a faster version but the result accuracy may be slightly lower
            # rng_idx = np.random.randint(0, 19999, size=30)
            # s1 = X[rng_idx[:10]]
            # s2 = X[rng_idx[10:]]

            # a slower version but the result is definitely true
            Xperm = np.random.permutation(X)  # fisrt randomization
            s1 = Xperm[:n1]     # stage 1 sampling
            Xperm2 = np.random.permutation(X[n1:])    # second randomization
            s2 = Xperm2[:n2]    # stage 2 sampling

            chp = ChPtest(s1=s1, s2=s2, k1=ck1, k2=ck2, k3=ck3, L=cL)
            usp = USPtest(s1=s1, s2=s2, k1=uk1, k2=uk2, L1=uL1, L2=uL2)
            chp_result = chp_result + chp.result()
            usp_result = usp_result + usp.result()

        chp_pass_s1[i] = chp_result[0] / sim_num
        chp_pass_s2[i] = chp_result[1] / sim_num
        chp_fail_s1[i] = chp_result[2] / sim_num
        chp_fail_s2[i] = chp_result[3] / sim_num
        usp_pass_s1[i] = usp_result[0] / sim_num
        usp_pass_s2[i] = usp_result[1] / sim_num
        usp_fail_s1[i] = usp_result[2] / sim_num
        usp_fail_s2[i] = usp_result[3] / sim_num

        if failrate[i] <= 0.05:
            chp_acc_s1[i] = chp_pass_s1[i]
            chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i]
            usp_acc_s1[i] = usp_pass_s1[i]
            usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i]
        else:
            chp_acc_s1[i] = chp_fail_s1[i]
            chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i]
            # usp_acc_s1[i] = usp_fail_s1[i]
            usp_acc[i] = usp_fail_s2[i]

    return [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc]


def sim_mu(sd, start, end, mu_num, sim_num=1000, n1=10, n2=20,
           low_lvl=85, upp_lvl=115, ck1=2.2, ck2=0.25, ck3=1.7, cL=15,
           uk1=2.4, uk2=2.0, uL1=15, uL2=25):
    """sim_mu provides a method to simulate CUtest with nomal distributions
    which are under a given standard deviation and different mean values(`mu`).
    `mu` starting from `start` and ending with `end`, are evenly spaced.

    Parameters
    ----------
    sd : float
        The standard deivation of the normal distribution.
    start : float
        The starting standard mean value of the normal distribution.
    end : float
        The ending mean value of the normal distribution.
    mu_num : int
        Number of `mu` to generate.
    sim_num : int, optional
        Number of test simulations to generate. Default is 1000.
    n1 : int, optional
        Number of samples tested in the stage 1.
        Default is 10, which complies with USP 905 and ChP 0941.
    n2 : int, optional
        Number of samples tested in the stage 2.
        Default is 20, which complies with USP 905 and ChP 0941.
    low_lvl : int, optional
        The lower content bound of qualified tablets.
        Default is 85, which means the content should not be less than 85%
        of the label claim.
    upp_lvl : int, optional
        The upper content bound of qualified tablets.
        Default is 115, which means the content should not be greater than 115%
        of the label claim.
    ck1 : float, optional
        The parameter in CUtest.ChPtest(), default is 2.2.
    ck2 : float, optional
        The parameter in CUtest.ChPtest(), default is 0.25.
    ck3 : float, optional
        The parameter in CUtest.ChPtest(), default is 1.7.
    cL : float, optional
        The paramter in CUtest.ChPtest(), default is 15.
    uk1 : float, optional
        The parameter in CUtest.USPtest(), default is 2.4.
    uk2 : float, optional
        The parameter in CUtest.USPtest(), default is 2.0.
    uL1 : float, optional
        The parameter in CUtest.USPtest(), default is 15.
    uL2 : float, optional
        The parameter in CUtest.USPtest(), default is 25.

    Returns
    -------
    Returns a list, containing `failrate`, `chp_acc_s1`, `chp_acc`, `usp_acc_s1`
    and `usp_acc`

    failrate: numpy array
        The rate of unqualified tablets in each simulated batch.
        The length eqauls to `mu_num`.
    chp_acc_s1: numpy array
        The stage 1 test accuracy of ChP 0941
        The length eqauls to `mu_num`.
    chp_acc: numpy array
        The test accuracy of ChP 0941.
        The length eqauls to `mu_num`.
    usp_acc_s1: numpy array
        The stage 1 test accuracy of USP 905.
        The length eqauls to `mu_num`.
    usp_acc: numpy array
        The test accuracy of USP 905.


    Note
    ---------
    In this function, accuray = (TP + TN) / (TP + FP + TN + FN)
    """

    n_batch = 20000    # simulate 20k tablets in one batch
    # initialization with zeros
    failrate = np.zeros(mu_num)
    chp_fail_s1 = np.zeros(mu_num)
    chp_pass_s1 = np.zeros(mu_num)
    chp_fail_s2 = np.zeros(mu_num)
    chp_pass_s2 = np.zeros(mu_num)
    usp_fail_s1 = np.zeros(mu_num)    # no fail in stage 1 in USP indded
    usp_pass_s1 = np.zeros(mu_num)
    usp_fail_s2 = np.zeros(mu_num)
    usp_pass_s2 = np.zeros(mu_num)
    chp_acc_s1 = np.zeros(mu_num)
    chp_acc = np.zeros(mu_num)
    usp_acc_s1 = np.zeros(mu_num)
    usp_acc = np.zeros(mu_num)

    for i in range(mu_num):
        # calc the sd in each loop
        mu = (end - start) / (mu_num - 1) * i + start
        # normal simulation
        X = np.random.normal(loc=mu, scale=sd, size=n_batch)
        failrate[i] = np.size(X[(X > upp_lvl) | (X < low_lvl)]) / n_batch

        # initialization with zeros
        # result =  np.array([pass_s1, pass_s2, fail_s1, fail_s2])
        chp_result = np.array([0, 0, 0, 0])
        usp_result = np.array([0, 0, 0, 0])

        for j in range(sim_num):
            Xperm = np.random.permutation(X)  # fisrt randomization
            s1 = Xperm[:n1]     # stage 1 sampling
            Xperm2 = np.random.permutation(X[n1:])    # second randomization
            s2 = Xperm2[:n2]    # stage 2 sampling

            chp = ChPtest(s1=s1, s2=s2, k1=ck1, k2=ck2, k3=ck3, L=cL)
            usp = USPtest(s1=s1, s2=s2, k1=uk1, k2=uk2, L1=uL1, L2=uL2)
            chp_result = chp_result + chp.result()
            usp_result = usp_result + usp.result()

        chp_pass_s1[i] = chp_result[0] / sim_num
        chp_pass_s2[i] = chp_result[1] / sim_num
        chp_fail_s1[i] = chp_result[2] / sim_num
        chp_fail_s2[i] = chp_result[3] / sim_num
        usp_pass_s1[i] = usp_result[0] / sim_num
        usp_pass_s2[i] = usp_result[1] / sim_num
        usp_fail_s1[i] = usp_result[2] / sim_num
        usp_fail_s2[i] = usp_result[3] / sim_num

        if failrate[i] <= 0.05:
            chp_acc_s1[i] = chp_pass_s1[i]
            chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i]
            usp_acc_s1[i] = usp_pass_s1[i]
            usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i]
        else:
            chp_acc_s1[i] = chp_fail_s1[i]
            chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i]
            # usp_acc_s1[i] = usp_fail_s1[i]
            usp_acc[i] = usp_fail_s2[i]

    return [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc]

Mind that the 20k samples are generated using normal distributions, and 10/30 samples are drawn from the 20k samples. There are 2 ways to implement the sampling procedure. The one (used in this article) is to shuffle all the 20k samples randomly, and then drawing the first 10/30 samples. The other (shown in the sim_sd function) is randomly generating 10/30 integers smaller than 20k as the indices, and drawing the samples using the indices. The latter method is much faster, but chances are that one sample might be drawn 2 or more times, which is against the real procedure. However, given the population size is 20k, such situation hardly happens.

C++ version

The Python version costs a lot of time. The slowest part is to shuffle the 20k samples. But if we choose the latter way to draw samples, the time is not totally satisfactory indeed. Because the code is doing a lot of loops, we may use Numba or Cython to speed the code. Another way or ultimate way is using C++ to cover the core code.

In this post, Eigen is used to replace Numpy to generate random numbers.

CUtest class

// CUTest.hpp

#pragma once
#include <Eigen/Dense>
#include <EigenRand/EigenRand>
#include <chrono>
#include <cmath>
#include <random>
#include <iostream>

/// @brief standard deviation with 1 degree of freedom (unbiased)
/// @param array an Eigen array
/// @param ddof the degree of freedom, default is 1
/// @return the standard deviation of the array
double std_ddof(Eigen::ArrayXd array, int ddof);

class ChPTest {
public:
    // members
    Eigen::Array<double, 10, 1> s1;
    Eigen::Array<double, 20, 1> s2;
    Eigen::Array<double, 30, 1> stage_2;
    double k1{ 2.2 };
    double k2{ 0.5 };
    double k3{ 1.7 };
    double L{ 15 };
    double std_s1;
    double std_stage2;
    double mean_s1;
    double mean_stage2;

    Eigen::Array4d get_result();

    // constructor function
    ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in);
    ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, const double k1_in, const double k2_in, const double k3_in, const double L_in);
    ~ChPTest();
};

class USPTest {
public:
    Eigen::Array<double, 10, 1> s1;
    Eigen::Array<double, 20, 1> s2;
    Eigen::Array<double, 30, 1> stage_2;
    double k1{ 2.4 };
    double k2{ 2.0 };
    double L1{ 15.0 };
    double L2{ 25.0 };
    double std_s1;
    double std_stage2;
    double mean_s1;
    double mean_stage2;

    // constructor function
    USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in);
    USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, double k1_in, double k2_in, double L1_in, double L2_in);
    ~USPTest();
    // member function
    Eigen::Array4d get_result();
};

// CUTest.cpp
#include "CUTest.hpp"

// standard deviation with 1 degree of freedom (unbiased)
double std_ddof(Eigen::ArrayXd array, int ddof=1) {
    double standard_deviation = std::sqrt((array - array.mean()).square().sum() / (array.size() - ddof));
    return standard_deviation;
}

ChPTest::ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in) {
    // check length error
    if (s1_in.size() != 10) {
        throw(std::runtime_error("The length of s1 must be 10!"));
    }
    if (s2_in.size() != 20) {
        throw(std::runtime_error("The length of s1 must be 10!"));
    }
    s1 = s1_in;
    s2 = s2_in;
    stage_2 << s1, s2;
    mean_s1 = s1.mean();
    std_s1 = std_ddof(s1, 1);
    mean_stage2 = stage_2.mean();
    std_stage2 = std_ddof(stage_2, 1);
}

ChPTest::ChPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, const double k1_in, const double k2_in, const double k3_in, const double L_in) {
    // check length error
    if (s1_in.size() != 10) {
        throw(std::runtime_error("The length of s1 must be 10!"));
    }
    if (s2_in.size() != 20) {
        throw(std::runtime_error("The legnth of s2 must be 20!"));
    }

    s1 = s1_in;
    s2 = s2_in;
    k1 = k1_in;
    k2 = k2_in;
    k3 = k3_in;
    L = L_in;
    stage_2 << s1, s2;
    mean_s1 = s1.mean();
    std_s1 = std_ddof(s1, 1);
    mean_stage2 = stage_2.mean();
    std_stage2 = std_ddof(stage_2, 1);
}

ChPTest::~ChPTest() { }

//defintion of member function get_result
Eigen::Array4d ChPTest::get_result() {
    int pass_s1 = 0;
    int pass_s2 = 0;
    int fail_s1 = 0;
    int fail_s2 = 0;

    double Ave = std::abs(100 - mean_s1);
    if (Ave + k1 * std_s1 <= L) {
        pass_s1 = 1;
    }
    else if (Ave + std_s1 > L) {
        fail_s1 = 1;
    }
    else {
        Ave = std::abs(100 - mean_stage2);
        if (Ave <= k2 * L) {
            if (pow(Ave, 2) + pow(std_stage2, 2) <= k2 * pow(L, 2)) {
                pass_s2 = 1;
            }
            else {
                fail_s2 = 1;
            }
        }
        else
        {
            if (Ave + k3 * std_stage2 <= L) {
                pass_s2 = 1;
            }
            else {
                fail_s2 = 1;
            }
        }
    }
    Eigen::Array4d result;
    result << pass_s1, pass_s2, fail_s1, fail_s2;
    return result;
}


USPTest::USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in) {
    // check length error
    if (s1_in.size() != 10) {
        throw("The length of s1 must be 10!");
    }
    if (s2_in.size() != 20) {
        throw("The legnth of s2 must be 20!");
    }

    s1 = s1_in;
    s2 = s2_in;
    stage_2 << s1, s2;
    mean_s1 = s1.mean();
    std_s1 = std_ddof(s1, 1);
    mean_stage2 = stage_2.mean();
    std_stage2 = std_ddof(stage_2, 1);
}

USPTest::USPTest(const Eigen::ArrayXd& s1_in, const Eigen::ArrayXd& s2_in, double k1_in, double k2_in, double L1_in, double L2_in)
{
    // check length error
    if (s1_in.size() != 10) {
        throw("The length of s1 must be 10!");
    }
    if (s2_in.size() != 20) {
        throw("The legnth of s2 must be 20!");
    }

    s1 = s1_in;
    s2 = s2_in;
    k1 = k1_in;
    k2 = k2_in;
    L1 = L1_in;
    L2 = L2_in;
    stage_2 << s1, s2;
    mean_s1 = s1.mean();
    std_s1 = std_ddof(s1, 1);
    mean_stage2 = stage_2.mean();
    std_stage2 = std_ddof(stage_2, 1);
}

USPTest::~USPTest() {}

// return Eigen::Array4d [pass_s1, pass_s2, fail_s1, fail_s2]
Eigen::Array4d USPTest::get_result() {
    int pass_s1 = 0;
    int pass_s2 = 0;
    int fail_s1 = 0;  //no fail in stage 1, indeed
    int fail_s2 = 0;

    double Ave = 0;
    double Ave2 = 0;
    double limit = 0;
    if (mean_s1 < 98.5) {
        Ave = 98.5 - mean_s1;
    }
    else if (mean_s1 > 101.5) {
        Ave = mean_s1 - 101.5;
    }
    else {
        Ave = 0;
    }

    // stage 1 test
    if (Ave + k1 * std_s1 <= L1) {
        pass_s1 = 1;
    }
    else {
        if (mean_stage2 < 98.5) {
            Ave2 = 98.5 - mean_stage2;
            limit = 98.5;
        }
        else if (mean_stage2 > 101.5) {
            Ave2 = mean_stage2 - 101.5;
            limit = 101.5;
        }
        else {
            Ave2 = 0;
            limit = mean_stage2;
        }

        if ((Ave2 + k2 * std_stage2 <= L1) && (stage_2 >= (1 - 0.01 * L2) * limit).all() && (stage_2 <= (1 + 0.01 * L2) * limit).all()) {
            pass_s2 = 1;
        }
        else {
            fail_s2 = 1;
        }
    }
    Eigen::Array4d result;
    result << pass_s1, pass_s2, fail_s1, fail_s2;
    return result;
}

CUsim methods

Since the code is mainly for-loop, we can use openmp to speed it.

// CUSim.hpp

#pragma once
#include "CUTest.hpp"
/// @brief simulate CUtest under different standard deviation values and a fixed mean value
/// @param mu mean value of the normal distribution
/// @param start the start sd
/// @param end the end sd
/// @param mu_num the nunmber of sd values to be generated, defalt is 101
/// @param sim_num the simulation number, default is 10000
/// @param low_lvl the lower level of qualified tablets
/// @param upp_lvl the upper level of qualified tablets 
/// @param ck1 the value of k1 in ChPtest
/// @param ck2 the value of k2 in ChPtest
/// @param ck3 the value of k3 in ChPtest
/// @param cL the value of L in ChPtest
/// @param uk1 the value of k1 in USPtest
/// @param uk2 the value of k2 in USPtest
/// @param uL1 the value of L1 in USPtest
/// @param uL2 the value of L2 in USPtest
/// @return vector of [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2]
std::vector<Eigen::ArrayXd> sim_sd(double mu, double start, double end, int number_of_threads=8, int sd_num = 61, int sim_num = 10000, double low_lvl = 85.0, double upp_lvl = 115.0,
    double ck1 = 2.2, double ck2 = 0.25, double ck3 = 1.7, double cL = 15.0, double uk1 = 2.4, double uk2 = 2.0, double uL1 = 15.0, double uL2 = 25.0);

/// @brief simulate CUtest under different mean values and a fixed standard deviation
/// @param mu standard deviation of the normal distribution
/// @param start the start mu
/// @param end the end mu
/// @param mu_num the nunmber of mu values to be generated, defalt is 101
/// @param sim_num the simulation number, default is 10000
/// @param low_lvl the lower level of qualified tablets
/// @param upp_lvl the upper level of qualified tablets 
/// @param ck1 the value of k1 in ChPtest
/// @param ck2 the value of k2 in ChPtest
/// @param ck3 the value of k3 in ChPtest
/// @param cL the value of L in ChPtest
/// @param uk1 the value of k1 in USPtest
/// @param uk2 the value of k2 in USPtest
/// @param uL1 the value of L1 in USPtest
/// @param uL2 the value of L2 in USPtest
/// @return vector of [failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2]
std::vector<Eigen::ArrayXd> sim_mu(double sd, double start, double end, int number_of_threads = 8, int mu_num=101, int sim_num=10000, double low_lvl=85.0, double upp_lvl=115.0,
    double ck1 = 2.2, double ck2 = 0.25, double ck3 = 1.7, double cL = 15.0, double uk1 = 2.4, double uk2 = 2.0, double uL1 = 15.0, double uL2 = 25.0);

// CUSim.cpp
#include "CUTest.hpp"
#include <omp.h>

std::vector<Eigen::ArrayXd> sim_sd(double mu, double start, double end, int number_of_threads, int sd_num, int sim_num, double low_lvl, double upp_lvl,
	double ck1, double ck2, double ck3, double cL, double uk1, double uk2, double uL1, double uL2) {
	int batch = 20000;    // simulate 20k tablets in one batch
	Eigen::ArrayXd failrate = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_fail_s1 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_fail_s2 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_pass_s1 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_pass_s2 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd usp_fail_s1 = Eigen::ArrayXd::Zero(sd_num);    // no fail in stage 1 test indeed
	Eigen::ArrayXd usp_fail_s2 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd usp_pass_s1 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd usp_pass_s2 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_acc_s1 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd chp_acc = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd usp_acc_s1 = Eigen::ArrayXd::Zero(sd_num);
	Eigen::ArrayXd usp_acc = Eigen::ArrayXd::Zero(sd_num);

	std::random_device device;
#pragma omp parallel for num_threads(number_of_threads) collapse(2)
	for (int i = 0; i < sd_num; i++) {
		double sd = start + (end - start) * i / double(sd_num - 1);

		// normal distribution
		
		Eigen::Rand::Vmt19937_64 urng{ device() };
		Eigen::Rand::NormalGen<double> norm_gen{mu, sd};
		Eigen::ArrayXd X = norm_gen.generate<Eigen::ArrayXd>(batch, 1, urng);

		failrate[i] = double((X > upp_lvl || X < low_lvl).count()) / batch;

		//result = np.array([pass_s1, pass_s2, fail_s1, fail_s2])
		Eigen::Array4d chp_result = Eigen::Array4d::Zero(4);
		Eigen::Array4d usp_result = Eigen::Array4d::Zero(4);

        Eigen::Rand::UniformIntGen<int> uni_int_gen{0, 19999};
        Eigen::Rand::Vmt19937_64 urng_uni_int{device()};
		std::mt19937 g(device());
		for (int j = 0; j < sim_num; ++j) {
    		// a fast simulation method: generating 100 numbers between 0 and 19999
    		// then choose the unique elements from the 100 numbers
    		// use the first 10 unique numbers as the s1 indice
    		// use the 11th to 30th unique numbers as the s2 indice
            Eigen::ArrayXi rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(100, 1, urng_uni_int);
            std::set<int> unique_idx{ rng_idx.begin(), rng_idx.end() };
            Eigen::ArrayXi unique_idx_30(30, 1);
            int count_elem = 0;
            for (int elem_unique : unique_idx) {
                unique_idx_30(count_elem) = elem_unique;
                count_elem += 1;
                if (count_elem == 30) {
                    break;
                }
            }
			Eigen::ArrayXd s1 = X(Eigen::seq(0, 9));
			Eigen::ArrayXd s2 = X(Eigen::seq(10, 29));
			ChPTest chp = ChPTest(s1, s2, ck1, ck2, ck3, cL);
			USPTest usp = USPTest(s1, s2, uk1, uk2, uL1, uL2);

			chp_result += chp.get_result();
			usp_result += usp.get_result();
		}

		chp_pass_s1[i] = chp_result[0] / double(sim_num);
		chp_pass_s2[i] = chp_result[1] / double(sim_num);
		chp_fail_s1[i] = chp_result[2] / double(sim_num);
		chp_fail_s2[i] = chp_result[3] / double(sim_num);
		usp_pass_s1[i] = usp_result[0] / double(sim_num);
		usp_pass_s2[i] = usp_result[1] / double(sim_num);
		usp_fail_s1[i] = usp_result[2] / double(sim_num);
		usp_fail_s2[i] = usp_result[3] / double(sim_num);

		if (failrate[i] <= 0.05) {
			chp_acc_s1[i] = chp_pass_s1[i];
			chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i];
			usp_acc_s1[i] = usp_pass_s1[i];
			usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i];
		}
		else {
			chp_acc_s1[i] = chp_fail_s1[i];
			chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i];
			usp_acc[i] = usp_fail_s2[i];
		}
	}
	std::vector<Eigen::ArrayXd> result{ failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2};
	return result;
}

std::vector<Eigen::ArrayXd> sim_mu(double sd, double start, double end, int number_of_threads, int mu_num, int sim_num, double low_lvl, double upp_lvl,
	double ck1, double ck2, double ck3, double cL, double uk1, double uk2, double uL1, double uL2) {
	int batch = 20000;    // simulate 20k tablets in one batch
	Eigen::ArrayXd failrate = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_fail_s1 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_fail_s2 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_pass_s1 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_pass_s2 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd usp_fail_s1 = Eigen::ArrayXd::Zero(mu_num);    // no fail in stage 1 test indeed
	Eigen::ArrayXd usp_fail_s2 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd usp_pass_s1 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd usp_pass_s2 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_acc_s1 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd chp_acc = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd usp_acc_s1 = Eigen::ArrayXd::Zero(mu_num);
	Eigen::ArrayXd usp_acc = Eigen::ArrayXd::Zero(mu_num);
	std::random_device device;
#pragma omp parallel for num_threads(number_of_threads)  collapse(2)
	for (int i = 0; i < mu_num; i++) {
		double mu = start + (end - start) * i / double(mu_num - 1);

		// normal distribution

		Eigen::Rand::Vmt19937_64 urng{ device() };
		Eigen::Rand::NormalGen<double> norm_gen{ mu, sd };
		Eigen::ArrayXd X = norm_gen.generate<Eigen::ArrayXd>(batch, 1, urng);

		failrate[i] = double((X > upp_lvl || X < low_lvl).count()) / batch;

		//result = np.array([pass_s1, pass_s2, fail_s1, fail_s2])
		Eigen::Array4d chp_result = Eigen::Array4d::Zero(4);
		Eigen::Array4d usp_result = Eigen::Array4d::Zero(4);

		// random draw 30 samples, first 10 as s1, last 20 as s2
		//Eigen::Rand::UniformIntGen<int> uni_int_gen{ 0, 19999 };
		//Eigen::Rand::Vmt19937_64 urng_uni_int{ device() };
		//Eigen::ArrayXi rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(30, 1, urng_uni_int);
		std::mt19937 g(device());

		for (int j = 0; j < sim_num; ++j) {
			//rng_idx = uni_int_gen.generate<Eigen::ArrayXi>(30, 1, urng_uni_int);
			//Eigen::ArrayXi idx_s1 = rng_idx(Eigen::seq(0, 9));
			//Eigen::ArrayXi idx_s2 = rng_idx(Eigen::seq(10, 29));
			//Eigen::Array<double, 10, 1> s1 = X(idx_s1);
			//Eigen::Array<double, 20, 1> s2 = X(idx_s2);

			std::shuffle(X.begin(), X.end(), g);
			Eigen::ArrayXd s1 = X(Eigen::seq(0, 9));
			Eigen::ArrayXd s2 = X(Eigen::seq(10, 29));
			ChPTest chp = ChPTest(s1, s2, ck1, ck2, ck3, cL);
			USPTest usp = USPTest(s1, s2, uk1, uk2, uL1, uL2);
			chp_result += chp.get_result();
			usp_result += usp.get_result();
		}

		chp_pass_s1[i] = chp_result[0] / double(sim_num);
		chp_pass_s2[i] = chp_result[1] / double(sim_num);
		chp_fail_s1[i] = chp_result[2] / double(sim_num);
		chp_fail_s2[i] = chp_result[3] / double(sim_num);
		usp_pass_s1[i] = usp_result[0] / double(sim_num);
		usp_pass_s2[i] = usp_result[1] / double(sim_num);
		usp_fail_s1[i] = usp_result[2] / double(sim_num);
		usp_fail_s2[i] = usp_result[3] / double(sim_num);

		if (failrate[i] <= 0.05) {
			chp_acc_s1[i] = chp_pass_s1[i];
			chp_acc[i] = chp_pass_s1[i] + chp_pass_s2[i];
			usp_acc_s1[i] = usp_pass_s1[i];
			usp_acc[i] = usp_pass_s1[i] + usp_pass_s2[i];
		}
		else {
			chp_acc_s1[i] = chp_fail_s1[i];
			chp_acc[i] = chp_fail_s1[i] + chp_fail_s2[i];
			usp_acc[i] = usp_fail_s2[i];
		}
	}
	std::vector<Eigen::ArrayXd> result{ failrate, chp_acc_s1, chp_acc, usp_acc_s1, usp_acc, chp_pass_s1, chp_pass_s2, usp_pass_s1, usp_pass_s2, usp_fail_s1,  usp_fail_s2 };
	return result;
}

Binding

We have used C++ to write the core part of the experiments and we now want to call the C++ code in Python. Pybind11 provides many conversions between Python and C++, which helps to bind the two languages easily.

//bind.cpp
#include <pybind11/pybind11.h>
#include <pybind11/eigen.h>
#include <pybind11/stl.h>
#include "CUSim.hpp"
#include "CUSim_fast.hpp"

namespace py = pybind11;

PYBIND11_MODULE(cusim, m) {
    m.def("sim_sd", &sim_sd, "CUSim",
        py::arg("mu") = 100, py::arg("start") = 3, py::arg("end") = 15, py::arg("number_of_threads") = 8,
        py::arg("sd_num") = 61, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
        py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
        py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
    m.def("sim_mu", &sim_mu, "simulate mu",
        py::arg("sd") = 10, py::arg("start") = 90, py::arg("end") = 110, py::arg("number_of_threads") = 8,
        py::arg("mu_num") = 101, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
        py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
        py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
    m.def("sim_sd_fast", &sim_sd_fast, "simulate sd",
        py::arg("mu") = 100, py::arg("start") = 3, py::arg("end") = 15, py::arg("number_of_threads") = 8,
        py::arg("sd_num") = 61, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
        py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
        py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
    m.def("sim_mu_fast", &sim_mu_fast, "simulate mu",
        py::arg("sd") = 10, py::arg("start") = 90, py::arg("end") = 110, py::arg("number_of_threads") = 8,
        py::arg("mu_num") = 101, py::arg("sim_num") = 10000, py::arg("low_lvl") = 85, py::arg("upp_lvl") = 115,
        py::arg("ck1") = 2.2, py::arg("ck2") = 0.25, py::arg("ck3") = 1.7, py::arg("cL") = 15,
        py::arg("uk1") = 2.4, py::arg("uk2") = 2.0, py::arg("uL1") = 15, py::arg("uL2") = 25);
}

  1. Comparison Study of the Content Uniformity Tests in Chinese Pharmacopoeia and United States Pharmacopeia Based on Monte Carlo Simulation[J].Chin J Mod Appl Pharm, 2019, 36(19):2405-2410.DOI:10.13748/j.cnki.issn1007-7693.2019.19.008. ↩︎