BASH - REST-API - JSON封装(下)

  |   0 评论   |   1,848 浏览

前文我们使用JQ完成了对JSON参数的读取与操作. 这一步要解决最后一步.怎样在JAVA中调用SHELL,以及怎样良好的与SHELL进行交互. 也就是我们的目标, 封装成一个REST风格的(注: 这里只是JSON格式的处理,并未过多考虑REST的资源定义等)JSON API.

所有代码可以直接参考源代码仓库: https://github.com/jianhong-li/JavaHelloWorld/blob/master/demo-bash-rest-api-for-java/src/main/java/com/jianhongl/fresh/bash/support/BashRunner.java

Java调用Shell方式

Runtime.getRuntime().exec

Java里面操作shell基本有两种方式.一种是使用Runtime.getRuntime().exec(cmd)

@Test
public void testSimpleExecShell() throws IOException {
      String cmd = "ls -lash";
      Process exec = Runtime.getRuntime().exec(cmd);
}

此时我们只是执行了一个ls命令. 但是什么输出也没有拿到. 我们知道ls这个shell命令会把命令执行结果输出到控制台. 但是在java里面调用的时候. 我们怎么获取子程序的执行输出呢. 答案是使用输入流.

注: 这里的输入流是桥接到 子程序 的标准输出的. 同时SHELL程序的交互没有标准的接口. 都是文本化的程序输入作为参数. 标准输出内容作为程序的返回. 就像在shell中我们要获取一个pwd命令的输出的时候会写作: dir='pwd'

image.png

我们使用exec的输入流,就可以获取到子程序的输出流(标准输出). 这样我们就可以得到shell命令的输出.

/**
     * 演示使用 Runtime.getRuntime().exec() 执行 shell
     */
    @Test
    public void testExecShellSuccess() throws IOException {
        String cmd = "ls -lash";
        Process exec = Runtime.getRuntime().exec(cmd);
        // 读取输出流: 读取命令执行后的输出结果
        InputStream inputStream = exec.getInputStream();
        byte[] bytes = new byte[1024];
        int len = 0;
        System.out.println("--------- msg from input stream ---------");
        while ((len = inputStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
        System.out.println("--------- msg from error stream ---------");
        InputStream errorStream = exec.getErrorStream();
        while ((len = errorStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
    }

如果有错误的时候, 我们并不能从标准输出得到任何数据. shell的错误输出是在标准错误输出通道进行输出的. 因此,如果要获取错误.还需要读取错误流:

exec.getErrorStream()

以下是演示代码,因为 /pwd目录一般是不存在的. 因此这个命令执行会失败.

@Test
    public void testExecShellFail() throws IOException {
        String cmd = "ls -lash /pwd";
        Process exec = Runtime.getRuntime().exec(cmd);
        // 读取输出流: 读取命令执行后的输出结果
        InputStream inputStream = exec.getInputStream();

        byte[] bytes = new byte[1024];
        int len = 0;
        System.out.println("--------- msg from input stream ---------");
        while ((len = inputStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
        System.out.println("--------- msg from error stream ---------");
        InputStream errorStream = exec.getErrorStream();
        while ((len = errorStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
    }

对于一般的只执行一个命令.或者执行几个命令的脚本,我们可以用上面这种方式直接返回. 先启动子进程. 然后去读输出. 这样看起来没有什么问题.

或者, 有的时候对输出并不关心. 比如我调用一个shell来地一批数据进行push msg. 我只关心子程序的exit code. 此时我可能没有去读取程序的输出. 因为不关心.

这个时候, 就会出现一个问题. 一个比较隐晦的问题. 在exec中的子进程的输出是通过 pipe重定向到exec的输入的. 如果此时,我们不去读取这个输入. 那这个pipe就可能会阻塞. 最后导致程序不能执行. 阻塞在那里.

这个怎么验证呢? 我写了如下一个代码:

@Test
    public void testBlockReadDelay() throws IOException, InterruptedException {
        // String cmd = "bash -C 'for i in {1..100}; do echo $i; sleep 1;ls -lash; done'";
        String cmd = "bash -C 'for i in {1..100}\n"
            + "do\n"
            + "    echo \"[+] $(date +'%Y-%m-%d %H:%M:%S') [INFO   ]Welcome \\033[32;40;1m$i\\033[0m times: $msg\"\n"
            + "done'";
        //String cmd = "sh " + workingDir + "/echo_hello.sh";

        // 先开始执行命令
        Process exec = Runtime.getRuntime().exec(cmd);
        // 读取输出流: 读取命令执行后的输出结果
        InputStream inputStream = exec.getInputStream();


        logger.info("sleep 20s...");
        // 不马上读取.我们先等待 10 秒钟
        Thread.sleep(20000);
        System.out.println("start read");
        byte[] bytes = new byte[1024];
        int len = 0;
        System.out.println("--------- msg from input stream ---------");
        while ((len = inputStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
    }

此时,由于exec并不能通过bash -c 直接在参数中执行多条命令.因此我把命令写到了shell文件.
命令如下:

#!/bin/bash

msg="Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,Hello World,"

for i in {1..100}
do
    echo "[+] $(date +'%Y-%m-%d %H:%M:%S') [INFO   ]Welcome \033[32;40;1m$i\033[0m times: $msg"
done

在调用部分:

@Test
    public void testBlockReadDelay() throws IOException, InterruptedException {
        
        String cmd = "sh " + workingDir + "/echo_hello.sh";

        // 先开始执行命令
        Process exec = Runtime.getRuntime().exec(cmd);
        // 读取输出流: 读取命令执行后的输出结果
        InputStream inputStream = exec.getInputStream();


        logger.info("sleep 20s...");
        // 不马上读取.我们先等待 10 秒钟
        Thread.sleep(20000);
        System.out.println("start read");
        byte[] bytes = new byte[1024];
        int len = 0;
        System.out.println("--------- msg from input stream ---------");
        while ((len = inputStream.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
    }

我们通过输出的shell的日志查看每一条日志的时间,就可以看出来这个shell执行有没有阻塞了.
我们得到了如下的日志:

image.png

通过看58行日志以前的内容的大小,我们似乎看到了这个缓冲区大小: 大约 65k.

image.png

我们问一下GPT看看情况:

问: Runtime.getRuntime().exec() 的子程序的标准输出的缓冲区大小是多少

答: 在Java中使用Runtime.getRuntime().exec()创建的子进程的标准输出(以及标准错误输出)都有缓冲区大小限制。这个缓冲区的大小是由底层操作系统决定的,并且在不同的操作系统上可能有所不同。

对于Windows系统,默认情况下,子进程的标准输出缓冲区大小是 4KB(4096字节)。而在Unix/Linux系统上,通常情况下,子进程的标准输出缓冲区大小是 64KB(65536字节)。

然而,需要注意的是,这些值是默认设置,具体的缓冲区大小可能会因为操作系统版本、系统配置等而有所不同。在Java中,你可以通过ProcessBuilder类的redirectOutput方法来改变子进程的标准输出缓冲区的行为,比如重定向到文件而不是默认的内存缓冲区。这可以通过ProcessBuilder.Redirect类实现。

如果你需要处理大量输出或者有特殊的需求,建议使用适当的缓冲机制,以避免缓冲区溢出或阻塞问题。

因此确实是存在一个缓冲区. 且如果满了没有读取.那子程序的后面的命令执行是会阻塞的.

所以,如果是一个程序的输出很大. 或者说你本身又不需要读取这些数据的话, 你仍然是需要读取这些内容. 以便程序不阻塞的往下面执行.

注: 实际Runtime.getRuntime().exec()是可以用bash -c执行多条命令的. 需要传入数组.
比如: Process process = Runtime.getRuntime().exec(new String[]{"bash", "-c", "ls;pwd"});

有了以上的经验后,我们在封装API时, 必须有一个独立的线程来单独的读取IO.否则可能会造成IO阻塞. 最后程序暂停执行. 我们用了一个IO线程专门来读取SHELL的输出:

public class ExecWithIOThread {
    
    private static Logger logger = LoggerFactory.getLogger(BashRunnerWithExec.class);
    private static String workingDir = null;

    static {

        URL url = Thread.currentThread().getContextClassLoader().getResource("default.propeties");
        Preconditions.checkState(url != null);
        workingDir = url.getPath().substring(0, url.getPath().indexOf("demo-bash-rest-api-for-java"))
            + "demo-bash-rest-api-for-java/script";

    }


    @Test
    public void testBlockReadDelay() throws IOException, InterruptedException {

        String cmd = "sh " + workingDir + "/echo_hello.sh";

        // 先开始执行命令
        Process exec = Runtime.getRuntime().exec(cmd);

        Thread t1 = new Thread(new PrintLogRunnable(exec.getInputStream()));
        Thread t2 = new Thread(new PrintLogRunnable(exec.getErrorStream()));
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }
    
    public static final class PrintLogRunnable implements Runnable {

        private InputStream in;

        public PrintLogRunnable(InputStream in) {
            this.in = in;
        }

        @Override
        public void run() {
            try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in))) {
                String str = null;
                while ((str = bufferedReader.readLine()) != null) {
                    logger.info("{}", str);
                }

            } catch (Exception e) {
                logger.warn("{}", e);
            }
        }
    }
}

此时我们再观察程序的日志输出.我们发现. 此时日志输出已经是连续的的, 没有随着读取的中断(准确说是读取并没有中断) 而程序阻塞执行.

image.png

使用ProcessBuilder

由于exec是直接传任意命令. 因此存在一定的安全风险. 后面的代码都推荐使用: ProcessBuilder
这个对于交互上更友好. 同时对于IO的处理也有更多的选择.

比如对于不需要关心输出的程序,我们可以使用如下代码. 也不用我们自己再去读IO了. 只需要正常的调用完成即可.

@Test
    public void testRunnerWithIOInherit() {
        try {
            ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c",
                                                               "echo currentPID:$$; ls -lash; pwd; echo 'hello world'; pwd;echo currentPID:$$");
            processBuilder.inheritIO(); // 将子进程的输入输出连接到当前Java进程,方便查看输出
            Process process = processBuilder.start();
            int exitCode = process.waitFor(); // 等待子进程执行完毕
            System.out.println("子进程执行完毕,退出代码:" + exitCode);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

核心代码是: 启动与编辑是分开的. 需要单独的调用processBuilder.start() 同时,我们在启动前的设置里面有一个句: processBuilder.inheritIO(); 这样可以将子进程的输入输出连接到当前Java进程,方便查看输出. 而不用我们去读取了

当然对于我们这里要封装API的情况下,流还是需要我们自己读取的. 我们规范流的使用如下:

  1. std_out 只输出JSON_API的返回数据. 没有多余的数据. 这样我们在读取标准输出流的时候就可以得到shell的返回数据了. 且是标准的接口格式.
  2. 其它日志相关的,我们都输出在标准错误. 这样在Java程序中就可以把日志重定向到当前进程.这样可以直接输出. 或者也用一个日志处理线程.来单独的收集日志并进行打印.

以下是一个只重定向错误到当前进程的处理. 标准输出我们还是直接读取.:

@Test
    public void testRunnerRedirectError() {
        try {
            ProcessBuilder processBuilder = new ProcessBuilder("ls", "-lash", "/pwd");
            processBuilder.redirectError(Redirect.INHERIT); // 将子进程的错误输出重定向到当前Java进程
            Process process = processBuilder.start();
            int exitCode = process.waitFor(); // 等待子进程执行完毕
            System.out.println("子进程执行完毕,退出代码:" + exitCode);
        }catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

如果我们也为了让错误日志也通过 logger.info这样的标准日志格式进行封装的话, 那我们也可以使用一个标准的io处理线程来收集这个日志.

我们先实现一个标准的流读取器:

/**
     * 流读取器
     */
    public static class StreamGobbler implements Runnable {

        private InputStream inputStream;
        private Consumer<String> consumer;

        public StreamGobbler(InputStream inputStream, Consumer<String> consumer) {
            this.inputStream = inputStream;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            new BufferedReader(new InputStreamReader(inputStream))
                .lines()
                .forEach(consumer);
        }
    }

这个流读取器,是一个Runnable的代码.需要在一个独立的线程执行. 同时在初始化的时候, 除了传入了输入流,还另外输入了一个流Consumer. 这样这个流读取器就变成了一个标准的io读取器. 然后真正的数据消费用实例化时传入的 consumer来决定怎么处理数据.

首先是API返回数据消费者:

/**
     * APIResponse 数据数据读取器
     */
    public static class APIResponseConsumer {

        private StringBuffer sb = new StringBuffer();
        private char[] tmp = new char[4];
        private boolean start = true;

        public void add(String line) {
            if (start) {
                sb.append(line);
            } else {
                sb.append("\n").append(line);
            }
            start = false;
        }

        public String get() {
            return sb.toString();
        }

        public boolean isJson() {
            if (sb.length() > 0) {
                sb.getChars(0, 1, tmp, 0);
                return tmp[0] == '{';
            }
            return false;
        }

        public <T> APIResponse<T> get(Class<T> clz) {

            String apiJson = get();
            logger.info("===========>apiJson:\n" + apiJson + "\n<===============");
            return JsonUtils.readValue(apiJson, new TypeReference<APIResponse<T>>() {
                @Override
                public Type getType() {
                    return TypeUtils.parameterize(APIResponse.class, clz);
                }
            });
        }
    }

这个消费者会缓存所有的API 返回数据为string. 然后在反序列化的时候,根据调用者传入的class类型再反序列化为具体的Java类.这样就对上层屏蔽掉了底层的Shell交互的细节.

同样, 对于日志我们是要输出到logger.info的. 因此,这里我们不用单独写 consumer.只需要把这个logger的info方法作为consumer方法传入即可.

最后完整的代码如下:

/**
     * ShellLauncher 执行器
     *
     * @param script    脚本名.默认脚本为 default , infra相关API 脚本名为: infra_api
     * @param action    动作 - 目前有:
     *                  default:
     *                  ____ echo
     * @param jsonParam 具体action操作的相关参数.
     * @param openDebug 是否打开调试日志
     * @param clz       返回RestAPI data类的Class
     * @param <T>       返回数据data的泛型
     * @return 操作结果
     */
    public static <T> APIResponse<T> runCommand(
        String script, String action,
        Map<String, Object> jsonParam,
        boolean openDebug,
        Class<T> clz) {

        logger.info("准备执行:script={}", script);
        try {
            // 参数对齐
            jsonParam = jsonParam == null ? new HashMap<>() : jsonParam;

            ProcessBuilder builder = new ProcessBuilder();

            // 设置脚本当前目录
            builder.directory(new File(workingDir));
            // 命令拼接
            builder.command("bash", "launcher",
                            "--script", script,
                            "--action", action,
                            "--json", JsonUtils.writeValue(jsonParam),
                            openDebug ? "-x" : " ");

            // 启动任务
            Process process = builder.start();

            APIResponseConsumer apiResponseConsumer = new APIResponseConsumer();
            StreamGobbler streamGobbler = new StreamGobbler(process.getInputStream(), apiResponseConsumer::add);

            Future<?> future = executorService.submit(streamGobbler);
            executorService.submit(new StreamGobbler(process.getErrorStream(), logger::info));

            int exitCode = process.waitFor();

            future.get(30, TimeUnit.MINUTES);

            // 正常返回的,返回包装的JSON,不正常返回的. 包装为规范的错误.
            if (exitCode == 0 || apiResponseConsumer.isJson()) {
                return apiResponseConsumer.get(clz);
            } else {
                return APIResponse.returnFail(exitCode, apiResponseConsumer.get());
            }
        } catch (Exception e) {
            logger.error("bashRunner_exe_failed:msg={}", e.getMessage(), e);
            return APIResponse.returnFail(-1, e.getMessage());
        }
    }

评论

发表评论


取消