Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions src/urlscan/pro/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from typing import Any

from urlscan.client import BaseClient, _compact
from urlscan.client import BaseClient
from urlscan.types import (
ChannelPermissionType,
ChannelTypeType,
FrequencyType,
WeekDaysType,
)
from urlscan.utils import _compact, _merge


class Channel(BaseClient):
Expand Down Expand Up @@ -40,6 +41,7 @@ def create(
ignore_time: bool | None = None,
week_days: list[WeekDaysType] | None = None,
permissions: list[ChannelPermissionType] | None = None,
**kwargs: Any,
) -> dict:
"""Create a new channel.

Expand All @@ -55,6 +57,7 @@ def create(
ignore_time (bool | None, optional): Whether to ignore time constraints. Defaults to None.
week_days (list[WeekDaysType] | None, optional): Days of the week alerts will be generated (Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday). Defaults to None.
permissions (list[ChannelPermissionType] | None, optional): Permissions associated with this channel (team:read, team:write). Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Object containing the created channel.
Expand All @@ -64,19 +67,22 @@ def create(

"""
channel: dict[str, Any] = _compact(
{
"type": channel_type,
"name": name,
"webhookURL": webhook_url,
"frequency": frequency,
"emailAddresses": email_addresses,
"utcTime": utc_time,
"isActive": is_active,
"isDefault": is_default,
"ignoreTime": ignore_time,
"weekDays": week_days,
"permissions": permissions,
}
_merge(
{
"type": channel_type,
"name": name,
"webhookURL": webhook_url,
"frequency": frequency,
"emailAddresses": email_addresses,
"utcTime": utc_time,
"isActive": is_active,
"isDefault": is_default,
"ignoreTime": ignore_time,
"weekDays": week_days,
"permissions": permissions,
},
kwargs,
)
)
data = {"channel": channel}

Expand Down Expand Up @@ -113,6 +119,7 @@ def update(
ignore_time: bool | None = None,
week_days: list[WeekDaysType] | None = None,
permissions: list[ChannelPermissionType] | None = None,
**kwargs: Any,
) -> dict:
"""Update an existing channel.

Expand All @@ -129,29 +136,32 @@ def update(
ignore_time (bool | None, optional): Whether to ignore time constraints. Defaults to None.
week_days (list[WeekDaysType] | None, optional): Days of the week alerts will be generated (Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday). Defaults to None.
permissions (list[ChannelPermissionType] | None, optional): Permissions associated with this channel (team:read, team:write). Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Object containing the updated channel.


Reference:
https://docs.urlscan.io/apis/urlscan-openapi/channels/channelsupdate

"""
channel: dict[str, Any] = _compact(
{
"type": channel_type,
"name": name,
"webhookURL": webhook_url,
"frequency": frequency,
"emailAddresses": email_addresses,
"utcTime": utc_time,
"isActive": is_active,
"isDefault": is_default,
"ignoreTime": ignore_time,
"weekDays": week_days,
"permissions": permissions,
}
_merge(
{
"type": channel_type,
"name": name,
"webhookURL": webhook_url,
"frequency": frequency,
"emailAddresses": email_addresses,
"utcTime": utc_time,
"isActive": is_active,
"isDefault": is_default,
"ignoreTime": ignore_time,
"weekDays": week_days,
"permissions": permissions,
},
kwargs,
)
)
data = {"channel": channel}

Expand Down
87 changes: 49 additions & 38 deletions src/urlscan/pro/incident.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ScanIntervalModeType,
WatchedAttributeType,
)
from urlscan.utils import _merge


class Incident(BaseClient):
Expand All @@ -33,6 +34,7 @@ def create(
scan_interval_after_malicious: int | None = None,
incident_profile: str | None = None,
expire_after: int | None = None,
**kwargs: Any,
) -> dict:
"""Create an incident with specific options.

Expand All @@ -54,6 +56,7 @@ def create(
scan_interval_after_malicious (int | None, optional): How to change the scan interval after the observable became malicious. Defaults to None.
incident_profile (str | None, optional): ID of the incident profile to use when creating this incident. Defaults to None.
expire_after (int | None, optional): Seconds until the incident will automatically be closed. Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Incident body.
Expand All @@ -63,25 +66,28 @@ def create(

"""
incident: dict[str, Any] = _compact(
{
"observable": observable,
"visibility": visibility,
"channels": channels,
"scanInterval": scan_interval,
"scanIntervalMode": scan_interval_mode,
"watchedAttributes": watched_attributes,
"userAgents": user_agents,
"userAgentsPerInterval": user_agents_per_interval,
"countries": countries,
"countriesPerInterval": countries_per_interval,
"stopDelaySuspended": stop_delay_suspended,
"stopDelayInactive": stop_delay_inactive,
"stopDelayMalicious": stop_delay_malicious,
"scanIntervalAfterSuspended": scan_interval_after_suspended,
"scanIntervalAfterMalicious": scan_interval_after_malicious,
"incidentProfile": incident_profile,
"expireAfter": expire_after,
}
_merge(
{
"observable": observable,
"visibility": visibility,
"channels": channels,
"scanInterval": scan_interval,
"scanIntervalMode": scan_interval_mode,
"watchedAttributes": watched_attributes,
"userAgents": user_agents,
"userAgentsPerInterval": user_agents_per_interval,
"countries": countries,
"countriesPerInterval": countries_per_interval,
"stopDelaySuspended": stop_delay_suspended,
"stopDelayInactive": stop_delay_inactive,
"stopDelayMalicious": stop_delay_malicious,
"scanIntervalAfterSuspended": scan_interval_after_suspended,
"scanIntervalAfterMalicious": scan_interval_after_malicious,
"incidentProfile": incident_profile,
"expireAfter": expire_after,
},
kwargs,
)
)
data = {"incident": incident}

Expand Down Expand Up @@ -124,6 +130,7 @@ def update(
scan_interval_after_malicious: int | None = None,
incident_profile: str | None = None,
expire_after: int | None = None,
**kwargs: Any,
) -> dict:
"""Update specific runtime options of the incident.

Expand All @@ -146,6 +153,7 @@ def update(
scan_interval_after_malicious (int | None, optional): How to change the scan interval after the observable became malicious. Defaults to None.
incident_profile (str | None, optional): ID of the incident profile to use when creating this incident. Defaults to None.
expire_after (int | None, optional): Seconds until the incident will automatically be closed. Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Incident body.
Expand All @@ -155,25 +163,28 @@ def update(

"""
incident: dict[str, Any] = _compact(
{
"observable": observable,
"visibility": visibility,
"channels": channels,
"scanInterval": scan_interval,
"scanIntervalMode": scan_interval_mode,
"watchedAttributes": watched_attributes,
"userAgents": user_agents,
"userAgentsPerInterval": user_agents_per_interval,
"countries": countries,
"countriesPerInterval": countries_per_interval,
"stopDelaySuspended": stop_delay_suspended,
"stopDelayInactive": stop_delay_inactive,
"stopDelayMalicious": stop_delay_malicious,
"scanIntervalAfterSuspended": scan_interval_after_suspended,
"scanIntervalAfterMalicious": scan_interval_after_malicious,
"incidentProfile": incident_profile,
"expireAfter": expire_after,
}
_merge(
{
"observable": observable,
"visibility": visibility,
"channels": channels,
"scanInterval": scan_interval,
"scanIntervalMode": scan_interval_mode,
"watchedAttributes": watched_attributes,
"userAgents": user_agents,
"userAgentsPerInterval": user_agents_per_interval,
"countries": countries,
"countriesPerInterval": countries_per_interval,
"stopDelaySuspended": stop_delay_suspended,
"stopDelayInactive": stop_delay_inactive,
"stopDelayMalicious": stop_delay_malicious,
"scanIntervalAfterSuspended": scan_interval_after_suspended,
"scanIntervalAfterMalicious": scan_interval_after_malicious,
"incidentProfile": incident_profile,
"expireAfter": expire_after,
},
kwargs,
)
)
data = {"incident": incident}

Expand Down
43 changes: 27 additions & 16 deletions src/urlscan/pro/livescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from urlscan.client import BaseClient, _compact
from urlscan.types import LiveScanResourceType, VisibilityType
from urlscan.utils import _merge


class LiveScan(BaseClient):
Expand Down Expand Up @@ -32,6 +33,7 @@ def task(
extra_headers: dict[str, str] | None = None,
enable_features: list[str] | None = None,
disable_features: list[str] | None = None,
**kwargs: Any,
) -> dict:
"""Task a URL to be scanned.

Expand All @@ -46,6 +48,7 @@ def task(
extra_headers (dict[str, str] | None, optional): Extra HTTP headers. Defaults to None.
enable_features (list[str] | None, optional): Features to enable. Defaults to None.
disable_features (list[str] | None, optional): Features to disable. Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Response containing the scan UUID.
Expand All @@ -61,15 +64,18 @@ def task(
}
)
scanner: dict[str, Any] = _compact(
{
"pageTimeout": page_timeout,
"captureDelay": capture_delay,
"extraHeaders": extra_headers,
"enableFeatures": enable_features,
"disableFeatures": disable_features,
}
_merge(
{
"pageTimeout": page_timeout,
"captureDelay": capture_delay,
"extraHeaders": extra_headers,
"enableFeatures": enable_features,
"disableFeatures": disable_features,
},
kwargs,
)
)
data: dict[str, Any] = _compact({"task": task, "scanner": scanner})
data: dict[str, Any] = {"task": task, "scanner": scanner}

res = self._post(f"/api/v1/livescan/{scanner_id}/task/", json=data)
return self._response_to_json(res)
Expand All @@ -85,6 +91,7 @@ def scan(
extra_headers: dict[str, str] | None = None,
enable_features: list[str] | None = None,
disable_features: list[str] | None = None,
**kwargs: Any,
) -> dict:
"""Task a URL to be scanned. The HTTP request will block until the scan has finished.

Expand All @@ -97,6 +104,7 @@ def scan(
extra_headers (dict[str, str] | None, optional): Extra HTTP headers. Defaults to None.
enable_features (list[str] | None, optional): Features to enable. Defaults to None.
disable_features (list[str] | None, optional): Features to disable. Defaults to None.
**kwargs: Additional parameters to include in the request payload.

Returns:
dict: Response containing the scan UUID.
Expand All @@ -112,15 +120,18 @@ def scan(
}
)
scanner: dict[str, Any] = _compact(
{
"pageTimeout": page_timeout,
"captureDelay": capture_delay,
"extraHeaders": extra_headers,
"enableFeatures": enable_features,
"disableFeatures": disable_features,
}
_merge(
{
"pageTimeout": page_timeout,
"captureDelay": capture_delay,
"extraHeaders": extra_headers,
"enableFeatures": enable_features,
"disableFeatures": disable_features,
},
kwargs,
)
)
data: dict[str, Any] = _compact({"task": task, "scanner": scanner})
data: dict[str, Any] = {"task": task, "scanner": scanner}

res = self._post(f"/api/v1/livescan/{scanner_id}/scan/", json=data)
return self._response_to_json(res)
Expand Down
Loading