Skip to content

File interchiplet.cpp

File List > interchiplet > srcs > interchiplet.cpp

Go to the documentation of this file.

#include <fcntl.h>
#include <poll.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>

#include <ctime>
#include <vector>

#include "benchmark_yaml.h"
#include "cmd_handler.h"
#include "cmdline_options.h"
#include "spdlog/spdlog.h"

class ProcessStruct {
   public:
    ProcessStruct(const ProcessConfig& __config)
        : m_command(__config.m_command),
          m_args(__config.m_args),
          m_log_file(__config.m_log_file),
          m_to_stdout(__config.m_to_stdout),
          m_clock_rate(__config.m_clock_rate),
          m_pre_copy(__config.m_pre_copy),
          m_unfinished_line(),
          m_thread_id(),
          m_pid(-1),
          m_pid2(-1),
          m_sync_struct(NULL) {}

   public:
    // Configuration.
    std::string m_command;
    std::vector<std::string> m_args;
    std::string m_log_file;
    bool m_to_stdout;
    double m_clock_rate;
    std::string m_pre_copy;

    std::string m_unfinished_line;

    // Indentify
    int m_round;
    int m_phase;
    int m_thread;

    pthread_t m_thread_id;
    int m_pid;
    int m_pid2;

    SyncStruct* m_sync_struct;
};

void parse_command(char* __pipe_buf, ProcessStruct* __proc_struct, int __stdin_fd) {
    // Split line by '\n'
    std::string line = std::string(__pipe_buf);
    std::vector<std::string> lines;

    int start_idx = 0;
    for (std::size_t i = 0; i < line.size(); i++) {
        if (line[i] == '\n') {
            std::string l = line.substr(start_idx, i + 1 - start_idx);
            start_idx = i + 1;
            lines.push_back(l);
        }
    }
    if (start_idx < line.size()) {
        std::string l = line.substr(start_idx, line.size() - start_idx);
        lines.push_back(l);
    }

    // Unfinished line.
    if (__proc_struct->m_unfinished_line.size() > 0) {
        lines[0] = __proc_struct->m_unfinished_line + lines[0];
        __proc_struct->m_unfinished_line = "";
    }
    if (lines[lines.size() - 1].find('\n') == -1) {
        __proc_struct->m_unfinished_line = lines[lines.size() - 1];
        lines.pop_back();
    }

    // Get line start with [INTERCMD]
    for (std::size_t i = 0; i < lines.size(); i++) {
        std::string l = lines[i];
        if (l.substr(0, 10) == "[INTERCMD]") {
            InterChiplet::SyncCommand cmd = InterChiplet::parseCmd(l);
            cmd.m_stdin_fd = __stdin_fd;
            cmd.m_clock_rate = __proc_struct->m_clock_rate;
            cmd.m_cycle = cmd.m_cycle / __proc_struct->m_clock_rate;

            pthread_mutex_lock(&__proc_struct->m_sync_struct->m_mutex);

            // Check order and clear delay infomation.
            if (cmd.m_type == InterChiplet::SC_BARRIER || cmd.m_type == InterChiplet::SC_SEND ||
                cmd.m_type == InterChiplet::SC_LOCK || cmd.m_type == InterChiplet::SC_UNLOCK ||
                cmd.m_type == InterChiplet::SC_LAUNCH) {
                InterChiplet::SyncCommand write_cmd = cmd;
                if (cmd.m_type == InterChiplet::SC_BARRIER) {
                    write_cmd.m_desc |= InterChiplet::SPD_BARRIER;
                    write_cmd.m_desc |= write_cmd.m_nbytes;
                } else if (cmd.m_type == InterChiplet::SC_LOCK) {
                    write_cmd.m_desc |= InterChiplet::SPD_LOCK;
                } else if (cmd.m_type == InterChiplet::SC_UNLOCK) {
                    write_cmd.m_desc |= InterChiplet::SPD_UNLOCK;
                } else if (cmd.m_type == InterChiplet::SC_LAUNCH) {
                    write_cmd.m_desc |= InterChiplet::SPD_LAUNCH;
                }
                if (!__proc_struct->m_sync_struct->m_delay_list.checkOrderOfCommand(write_cmd)) {
                    spdlog::warn("Command order has changed. Delay information is canncelled.");
                    __proc_struct->m_sync_struct->m_delay_list.clearDelayInfo();
                }
            }

            // Call functions to handle corresponding command.
            switch (cmd.m_type) {
                case InterChiplet::SC_CYCLE:
                    handle_cycle_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_BARRIER:
                    handle_barrier_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_SEND:
                case InterChiplet::SC_RECEIVE:
                    handle_pipe_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_LOCK:
                    handle_lock_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_UNLOCK:
                    handle_unlock_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_LAUNCH:
                    handle_launch_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_WAITLAUNCH:
                    handle_waitlaunch_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_READ:
                    handle_read_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                case InterChiplet::SC_WRITE:
                    handle_write_cmd(cmd, __proc_struct->m_sync_struct);
                    break;
                default:
                    break;
            }

            pthread_mutex_unlock(&__proc_struct->m_sync_struct->m_mutex);
        }
    }
}

#define PIPE_BUF_SIZE 1024

void* bridge_thread(void* __args_ptr) {
    ProcessStruct* proc_struct = (ProcessStruct*)__args_ptr;

    int pipe_stdin[2];   // Pipe to send data to child process
    int pipe_stdout[2];  // Pipe to receive data from child process
    int pipe_stderr[2];  // Pipe to receive data from child process

    // Create pipes
    if (pipe(pipe_stdin) == -1 || pipe(pipe_stdout) == -1 || pipe(pipe_stderr) == -1) {
        perror("pipe");
        exit(EXIT_FAILURE);
    }

    // Create sub directory for subprocess.
    char* sub_dir_path = new char[128];
    sprintf(sub_dir_path, "./proc_r%d_p%d_t%d", proc_struct->m_round, proc_struct->m_phase,
            proc_struct->m_thread);
    if (access(sub_dir_path, F_OK) == -1) {
        mkdir(sub_dir_path, 0775);
    }

    // Fork a child process
    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {  // Child process
        // Close unnecessary pipe ends
        close(pipe_stdin[1]);
        close(pipe_stdout[0]);
        close(pipe_stderr[0]);

        // Redirect stdin and stdout to pipes
        dup2(pipe_stdin[0], STDIN_FILENO);
        dup2(pipe_stdout[1], STDOUT_FILENO);
        dup2(pipe_stderr[1], STDERR_FILENO);

        // Change directory to sub-process.
        if (chdir(sub_dir_path) < 0) {
            perror("chdir");
        }
        // TODO: Copy necessary configuration file.
        if (!proc_struct->m_pre_copy.empty()) {
            std::string cp_cmd = std::string("cp ") + proc_struct->m_pre_copy + " .";
            if (system(cp_cmd.c_str()) != 0) {
                perror("system");
            }
        }

        std::cout << "CWD: " << get_current_dir_name() << std::endl;

        // Build arguments.
        int argc = proc_struct->m_args.size();
        char** args_list = new char*[argc + 2];
        args_list[0] = new char[proc_struct->m_command.size() + 1];
        strcpy(args_list[0], proc_struct->m_command.c_str());
        args_list[0][proc_struct->m_command.size()] = '\0';
        for (int i = 0; i < argc; i++) {
            int arg_len = proc_struct->m_args[i].size();
            args_list[i + 1] = new char[arg_len + 1];
            strcpy(args_list[i + 1], proc_struct->m_args[i].c_str());
            args_list[i + 1][arg_len] = '\0';
        }
        args_list[argc + 1] = NULL;

        // Execute the child program
        std::cout << "Exec: ";
        for (int i = 0; i < proc_struct->m_args.size() + 1; i++) {
            std::cout << " " << args_list[i];
        }
        std::cout << std::endl;
        execvp(args_list[0], args_list);

        // If execl fails, it means the child process couldn't be started
        perror("execvp");
        exit(EXIT_FAILURE);
    } else {  // Parent process
        spdlog::info("Start simulation process {}. Command: {}", pid, proc_struct->m_command);
        proc_struct->m_pid2 = pid;

        // Close unnecessary pipe ends
        close(pipe_stdin[0]);
        close(pipe_stdout[1]);
        close(pipe_stderr[1]);
        int stdin_fd = pipe_stdin[1];
        int stdout_fd = pipe_stdout[0];
        int stderr_fd = pipe_stderr[0];

        pollfd fd_list[2] = {{fd : stdout_fd, events : POLL_IN},
                             {fd : stderr_fd, events : POLL_IN}};

        // Move log to subfolder.
        std::ofstream log_file(std::string(sub_dir_path) + "/" + proc_struct->m_log_file);

        // Write execution start time to log file.
        std::time_t t = std::time(0);
        std::tm* now = std::localtime(&t);
        log_file << "Execution starts at " << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
                 << "-" << now->tm_mday << "  " << (now->tm_hour) << ":" << (now->tm_min) << ":"
                 << (now->tm_sec) << std::endl;

        char* pipe_buf = new char[PIPE_BUF_SIZE + 1];
        bool to_stdout = proc_struct->m_to_stdout;
        int res = 0;
        while (true) {
            int res = poll(fd_list, 2, 1000);
            if (res == -1) {
                perror("poll");
                break;
            }

            bool has_stdout = false;
            if (fd_list[0].revents & POLL_IN) {
                has_stdout = true;
                int res = read(stdout_fd, pipe_buf, PIPE_BUF_SIZE);
                if (res <= 0) break;
                pipe_buf[res] = '\0';
                // log redirection.
                log_file.write(pipe_buf, res).flush();
                if (to_stdout) {
                    std::cout.write(pipe_buf, res);
                    std::cout.flush();
                }
                // Parse command in pipe_buf
                parse_command(pipe_buf, proc_struct, stdin_fd);
            }
            if (fd_list[1].revents & POLL_IN) {
                has_stdout = true;
                int res = read(stderr_fd, pipe_buf, PIPE_BUF_SIZE);
                if (res <= 0) break;
                pipe_buf[res] = '\0';
                // log redirection.
                log_file.write(pipe_buf, res).flush();
                if (to_stdout) {
                    std::cerr.write(pipe_buf, res);
                    std::cerr.flush();
                }
                // Parse command in pipe_buf
                parse_command(pipe_buf, proc_struct, stdin_fd);
            }

            // Check the status of child process and quit.
            int status;
            if (!has_stdout && (waitpid(pid, &status, WNOHANG) > 0)) {
                // Optionally handle child process termination status
                if (status == 0) {
                    spdlog::info("Simulation process {} terminate with status = {}.",
                                 proc_struct->m_pid2, status);
                } else {
                    spdlog::error("Simulation process {} terminate with status = {}.",
                                  proc_struct->m_pid2, status);
                }
                break;
            }
        }

        delete pipe_buf;
    }

    return 0;
}

InterChiplet::InnerTimeType __loop_phase_one(
    int __round, const std::vector<ProcessConfig>& __proc_phase1_cfg_list,
    const std::vector<ProcessConfig>& __proc_phase2_cfg_list) {
    // Create synchronize data structure.
    SyncStruct* g_sync_structure = new SyncStruct();

    // Load delay record.
    g_sync_structure->m_delay_list.loadDelayInfo("delayInfo.txt",
                                                 __proc_phase2_cfg_list[0].m_clock_rate);
    spdlog::info("Load {} delay records.", g_sync_structure->m_delay_list.size());

    // Create multi-thread.
    int thread_i = 0;
    std::vector<ProcessStruct*> proc_struct_list;
    for (auto& proc_cfg : __proc_phase1_cfg_list) {
        ProcessStruct* proc_struct = new ProcessStruct(proc_cfg);
        proc_struct->m_round = __round;
        proc_struct->m_phase = 1;
        proc_struct->m_thread = thread_i;
        proc_struct->m_sync_struct = g_sync_structure;
        int res =
            pthread_create(&(proc_struct->m_thread_id), NULL, bridge_thread, (void*)proc_struct);
        if (res < 0) {
            perror("pthread_create");
            exit(EXIT_FAILURE);
        }

        proc_struct_list.push_back(proc_struct);
        thread_i++;
    }

    // Wait threads to finish.
    for (auto& proc_struct : proc_struct_list) {
        pthread_join(proc_struct->m_thread_id, NULL);
        delete proc_struct;
    }
    spdlog::info("All process has exit.");

    // Remove file.
    for (auto& pipe_name : g_sync_structure->m_pipe_struct.pipeSet()) {
        remove(pipe_name.c_str());
    }

    // Dump benchmark record.
    g_sync_structure->m_bench_list.dumpBench("bench.txt", __proc_phase2_cfg_list[0].m_clock_rate);
    spdlog::info("Dump {} bench records.", g_sync_structure->m_bench_list.size());

    // Destory global synchronize structure, and return total cycle.
    InterChiplet::InnerTimeType res_cycle = g_sync_structure->m_cycle_struct.cycle();
    delete g_sync_structure;
    return res_cycle;
}

void __loop_phase_two(int __round, const std::vector<ProcessConfig>& __proc_cfg_list) {
    // Create synchronize data structure.
    SyncStruct* g_sync_structure = new SyncStruct();

    // Create multi-thread.
    int thread_i = 0;
    std::vector<ProcessStruct*> proc_struct_list;
    for (auto& proc_cfg : __proc_cfg_list) {
        ProcessStruct* proc_struct = new ProcessStruct(proc_cfg);
        proc_struct->m_round = __round;
        proc_struct->m_phase = 2;
        proc_struct->m_thread = thread_i;
        thread_i++;
        proc_struct->m_sync_struct = g_sync_structure;
        int res =
            pthread_create(&(proc_struct->m_thread_id), NULL, bridge_thread, (void*)proc_struct);
        if (res < 0) {
            perror("pthread_create");
            exit(EXIT_FAILURE);
        }

        proc_struct_list.push_back(proc_struct);
        thread_i++;
    }

    // Wait threads to finish.
    for (auto& proc_struct : proc_struct_list) {
        pthread_join(proc_struct->m_thread_id, NULL);
        delete proc_struct;
    }
    spdlog::info("All process has exit.");

    // Destory global synchronize structure.
    delete g_sync_structure;
}

int main(int argc, const char* argv[]) {
    // Parse command line.
    CmdLineOptions options;
    if (options.parse(argc, argv) != 0) {
        return 0;
    };

    // Initializate logging
    if (options.m_debug) {
        spdlog::set_level(spdlog::level::debug);
    }
    spdlog::info("==== LegoSim Chiplet Simulator ====");

    // Change working directory if --cwd is specified.
    if (options.m_cwd != "") {
        if (access(options.m_cwd.c_str(), F_OK) == 0) {
            if (chdir(options.m_cwd.c_str()) < 0) {
                perror("chdir");
            }
            spdlog::info("Change working directory {}.", get_current_dir_name());
        }
    }

    // Check exist of benchmark configuration yaml.
    if (access(options.m_bench.c_str(), F_OK) < 0) {
        spdlog::error("Cannot find benchmark {}.", options.m_bench);
        exit(EXIT_FAILURE);
    }

    // Load benchmark configuration.
    BenchmarkConfig configs(options.m_bench);
    spdlog::info("Load benchmark configuration from {}.", options.m_bench);

    // Get start time of simulation.
    struct timeval simstart, simend, roundstart, roundend;
    gettimeofday(&simstart, 0);

    InterChiplet::InnerTimeType sim_cycle = 0;
    for (int round = 1; round <= options.m_timeout_threshold; round++) {
        // Get start time of one round.
        gettimeofday(&roundstart, 0);
        spdlog::info("**** Round {} Phase 1 ****", round);
        InterChiplet::InnerTimeType round_cycle =
            __loop_phase_one(round, configs.m_phase1_proc_cfg_list, configs.m_phase2_proc_cfg_list);

        // Get simulation cycle.
        // If simulation cycle this round is close to the previous one, quit iteration.
        spdlog::info("Benchmark elapses {} cycle.",
                     static_cast<InterChiplet::TimeType>(round_cycle));
        if (round > 1) {
            // Calculate error of simulation cycle.
            double err_rate = ((double)round_cycle - (double)sim_cycle) / (double)round_cycle;
            err_rate = err_rate < 0 ? -err_rate : err_rate;
            spdlog::info("Difference related to pervious round is {}%.", err_rate * 100);
            // If difference is small enough, quit loop.
            if (err_rate < options.m_err_rate_threshold) {
                spdlog::info("Quit simulation because simulation cycle has converged.");
                sim_cycle = round_cycle;
                break;
            }
        }
        sim_cycle = round_cycle;

        spdlog::info("**** Round {} Phase 2 ***", round);
        __loop_phase_two(round, configs.m_phase2_proc_cfg_list);

        // Get end time of one round.
        gettimeofday(&roundend, 0);
        unsigned long elaped_sec = roundend.tv_sec - roundstart.tv_sec;
        spdlog::info("Round {} elapses {}d {}h {}m {}s.", round, elaped_sec / 3600 / 24,
                     (elaped_sec / 3600) % 24, (elaped_sec / 60) % 60, elaped_sec % 60);
    }

    // Get end time of simulation.
    spdlog::info("**** End of Simulation ****");
    gettimeofday(&simend, 0);
    unsigned long elaped_sec = simend.tv_sec - simstart.tv_sec;
    spdlog::info("Benchmark elapses {} cycle.", static_cast<InterChiplet::TimeType>(sim_cycle));
    spdlog::info("Simulation elapseds {}d {}h {}m {}s.", elaped_sec / 3600 / 24,
                 (elaped_sec / 3600) % 24, (elaped_sec / 60) % 60, elaped_sec % 60);
}