diff --git a/csmock/plugins/snyk.py b/csmock/plugins/snyk.py index 40a7d3f..853188d 100644 --- a/csmock/plugins/snyk.py +++ b/csmock/plugins/snyk.py @@ -57,10 +57,17 @@ def enable(self): self.enabled = True def init_parser(self, parser): - parser.add_argument( - "--snyk-bin-url", default=SNYK_BIN_URL, + # Create mutually exclusive group for snyk binary source + snyk_source_group = parser.add_mutually_exclusive_group() + + snyk_source_group.add_argument( + "--snyk-bin-url", help="URL to download snyk binary executable") + snyk_source_group.add_argument( + "--snyk-bin", + help="path to local snyk binary executable (skips download)") + parser.add_argument( "--snyk-auth", default="~/.config/configstore/snyk.json", help="file containing snyk authentication token") @@ -88,6 +95,10 @@ def handle_args(self, parser, args, props): # sanitize options passed to --snyk-code-test-opts to avoid shell injection self.snyk_code_test_opts = sanitize_opts_arg(parser, args, "--snyk-code-test-opts") + # apply default URL if neither snyk source option is specified + if not args.snyk_bin_url and not args.snyk_bin: + args.snyk_bin_url = SNYK_BIN_URL + # check whether we have access to snyk authentication token self.auth_token_src = os.path.expanduser(args.snyk_auth) if not os.access(self.auth_token_src, os.R_OK): @@ -97,33 +108,48 @@ def handle_args(self, parser, args, props): props.imp_checker_set.add("SNYK_CODE_WARNING") props.imp_csgrep_filters.append(("SNYK_CODE_WARNING", "--event=^error")) - # fetch snyk using the given URL - def fetch_snyk_hook(results, props): + def fetch_snyk(results): + # download snyk binary cache_dir = args.snyk_cache_dir try: # make sure the cache directory exists os.makedirs(cache_dir, mode=0o755, exist_ok=True) except OSError: - results.error("failed to create snyk cache directory: %s" % cache_dir) + results.error(f"failed to create snyk cache directory: {cache_dir}") return 1 url = args.snyk_bin_url snyk_bin_name = url.split("/")[-1] - self.snyk_bin = os.path.join(cache_dir, snyk_bin_name) + snyk_bin = os.path.join(cache_dir, snyk_bin_name) - if not args.snyk_refresh and os.path.exists(self.snyk_bin): - results.print_with_ts("reusing previously downloaded snyk executable: " + self.snyk_bin) + if not args.snyk_refresh and os.path.exists(snyk_bin): + results.print_with_ts(f"reusing previously downloaded snyk executable: {self.snyk_bin}") else: # fetch the binary executable - ec = results.exec_cmd(['curl', '-Lfso', self.snyk_bin, url]) + ec = results.exec_cmd(["curl", "-Lfso", snyk_bin, url]) if 0 != ec: - results.error("failed to download snyk binary executable: %s" % url) + results.error(f"failed to download snyk binary executable: {url}") return ec # add eXecute permission on the downloaded file - os.chmod(self.snyk_bin, 0o755) + os.chmod(snyk_bin, 0o755) + + return snyk_bin + + # fetch snyk using the given URL or use local binary + def fetch_snyk_hook(results, props): + if args.snyk_bin: + # use local snyk binary + self.snyk_bin = os.path.expanduser(args.snyk_bin) + results.print_with_ts("using local snyk executable: " + self.snyk_bin) + else: + self.snyk_bin = fetch_snyk(results) + + # validate snyk binary (common path for both local and downloaded) + if not os.path.exists(self.snyk_bin): + results.error("snyk binary does not exist: %s" % self.snyk_bin) + return 2 - # check whether we have eXecute access if not os.access(self.snyk_bin, os.X_OK): results.error("snyk binary is not executable: %s" % self.snyk_bin) return 2