Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions csmock/plugins/snyk.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,17 @@ def enable(self):
self.enabled = True

def init_parser(self, parser):
parser.add_argument(
"--snyk-bin-url", default=SNYK_BIN_URL,
# Create mutually exclusive group for snyk binary source
snyk_source_group = parser.add_mutually_exclusive_group()

snyk_source_group.add_argument(
"--snyk-bin-url",
help="URL to download snyk binary executable")

snyk_source_group.add_argument(
"--snyk-bin",
help="path to local snyk binary executable (skips download)")

parser.add_argument(
"--snyk-auth", default="~/.config/configstore/snyk.json",
help="file containing snyk authentication token")
Expand Down Expand Up @@ -88,6 +95,10 @@ def handle_args(self, parser, args, props):
# sanitize options passed to --snyk-code-test-opts to avoid shell injection
self.snyk_code_test_opts = sanitize_opts_arg(parser, args, "--snyk-code-test-opts")

# apply default URL if neither snyk source option is specified
if not args.snyk_bin_url and not args.snyk_bin:
args.snyk_bin_url = SNYK_BIN_URL

# check whether we have access to snyk authentication token
self.auth_token_src = os.path.expanduser(args.snyk_auth)
if not os.access(self.auth_token_src, os.R_OK):
Expand All @@ -97,33 +108,48 @@ def handle_args(self, parser, args, props):
props.imp_checker_set.add("SNYK_CODE_WARNING")
props.imp_csgrep_filters.append(("SNYK_CODE_WARNING", "--event=^error"))

# fetch snyk using the given URL
def fetch_snyk_hook(results, props):
def fetch_snyk(results):
# download snyk binary
cache_dir = args.snyk_cache_dir
try:
# make sure the cache directory exists
os.makedirs(cache_dir, mode=0o755, exist_ok=True)
except OSError:
results.error("failed to create snyk cache directory: %s" % cache_dir)
results.error(f"failed to create snyk cache directory: {cache_dir}")
return 1

url = args.snyk_bin_url
snyk_bin_name = url.split("/")[-1]
self.snyk_bin = os.path.join(cache_dir, snyk_bin_name)
snyk_bin = os.path.join(cache_dir, snyk_bin_name)

if not args.snyk_refresh and os.path.exists(self.snyk_bin):
results.print_with_ts("reusing previously downloaded snyk executable: " + self.snyk_bin)
if not args.snyk_refresh and os.path.exists(snyk_bin):
results.print_with_ts(f"reusing previously downloaded snyk executable: {self.snyk_bin}")
else:
# fetch the binary executable
ec = results.exec_cmd(['curl', '-Lfso', self.snyk_bin, url])
ec = results.exec_cmd(["curl", "-Lfso", snyk_bin, url])
if 0 != ec:
results.error("failed to download snyk binary executable: %s" % url)
results.error(f"failed to download snyk binary executable: {url}")
return ec

# add eXecute permission on the downloaded file
os.chmod(self.snyk_bin, 0o755)
os.chmod(snyk_bin, 0o755)

return snyk_bin

# fetch snyk using the given URL or use local binary
def fetch_snyk_hook(results, props):
if args.snyk_bin:
# use local snyk binary
self.snyk_bin = os.path.expanduser(args.snyk_bin)
results.print_with_ts("using local snyk executable: " + self.snyk_bin)
else:
self.snyk_bin = fetch_snyk(results)

# validate snyk binary (common path for both local and downloaded)
if not os.path.exists(self.snyk_bin):
results.error("snyk binary does not exist: %s" % self.snyk_bin)
return 2

# check whether we have eXecute access
if not os.access(self.snyk_bin, os.X_OK):
results.error("snyk binary is not executable: %s" % self.snyk_bin)
return 2
Expand Down