Builtin Functions
The contents of this page can be generated using:
spaces docsabs()
def abs()Take the absolute value of an int.
abs(0) == 0
abs(-10) == 10
abs(10) == 10
abs(10.0) == 10.0
abs(-12.34) == 12.34all()
def all() -> boolall: returns true if all values in the iterable object have a truth value of true.
all([1, True]) == True
all([1, 1]) == True
all([0, 1, True]) == False
all([True, 1, True]) == True
all([0, 0]) == False
all([0, False]) == False
all([True, 0]) == False
all([1, False]) == Falseany()
def any() -> boolany: returns true if any value in the iterable object have a truth value of true.
any([0, True]) == True
any([0, 1]) == True
any([0, 1, True]) == True
any([0, 0]) == False
any([0, False]) == Falseargs
program()
def program() -> stringReturns argv[0] (program/script name), or empty string if absent.
name = args.program()parser()
def parser(options)Creates a parser specification.
spec = args.parser(name="deploy", options=[...], positional=[...])Args
options:
_args_module_loaded()
def _args_module_loaded() -> NoneTypeInternal no-op hook to mirror module shape.
argv()
def argv() -> listReturns the full argv list passed to the script.
values = args.argv()parse()
def parse(spec)Parses argv according to a parser spec.
On --help or -h, prints usage and exits 0.
On bad input, prints usage + error and exits 2.
parsed = args.parse(spec)Args
spec:
assert_on()
def assert_on(condition, message) -> NoneTypeExits the process with exit status 1 if condition is false.
An optional message can be provided to describe the failure.
When omitted, the error message defaults to "assertion failed".
assert(1 + 1 == 2)
assert(x > 0, "x must be positive")Args
condition: bool:message: string:
checkout
add_asset()
def add_asset() -> NoneTypeAdds a file to the workspace.
content = "Hello. This is the content"
checkout.add_asset(
rule = {"name": "README.md"},
asset = {
"destination": "README.md",
"content": content,
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).asset: Adictof asset details containingcontent(str) anddestination(str).
update_env()
def update_env() -> NoneTypeCreates or updates the environment in the workspace during checkout.
checkout.update_env(
rule = {"name": "update_env"},
env = {
"paths": [],
"system_paths": ["/usr/bin", "/bin"],
"vars": {
"PS1": '"(spaces) $PS1"',
},
"inherited_vars": ["HOME", "SHELL", "USER"],
"optional_inherited_vars": ["TERM"],
"secret_inherited_vars": ["SSH_AUTH_SOCK"],
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).env: Adictcontaining environment details. Variables are execution-phase dependent; they are available in subsequent modules during checkout and fully available duringspaces run. *vars(dict): Environment variables to set. *paths(list): Paths to prepend toPATH. *system_paths(list): Paths appended to the end ofPATH. *inherited_vars(list): Variables fixed from the calling environment at checkout. *secret_inherited_vars(list): Variables inherited on demand with masked log values.
store_value()
def store_value(key, value) -> NoneTypeStores a key-value pair in the workspace settings, namespaced by either a provided path or the calling module’s member path in the workspace.
The value is available immediately after storing and persists across
checkout evaluations. Use workspace.load_value() to retrieve stored values.
checkout.store_value("my_key", {"version": "1.0", "enabled": True})
checkout.store_value("build_count", 42)
checkout.store_value("name", "my_project", path = "my/custom/path")Args
key: string: A string key to identify the stored value.value: Any JSON-compatible value (string, number, bool, list, dict, None).path: string: Optional path to store under. When omitted, the member path for the calling module is used.
set_max_checkout_queue()
def set_max_checkout_queue(count) -> NoneTypeSets the maximum number of concurrent tasks during the checkout phase.
This overrides the default max_queue_count for tasks in the Checkout phase.
checkout.set_max_checkout_queue(8)Args
count: int: Maximum number of concurrent checkout tasks.
add()
def add() -> NoneTypeAdds a rule to organize dependencies.
checkout.add(
rule = {"name": "my_rule", "deps": ["my_other_rule"]},
)Args
rule: Adictcontaining the rule definition (e.g.,name,deps,platforms,type, andhelp).
add_soft_link_asset()
def add_soft_link_asset() -> NoneTypeCreates a symbolic link from a source to a destination path.
checkout.add_soft_link_asset(
rule = {
"name": "symlink_file",
},
asset = {
"source": "path/to/original/file.txt",
"destination": "sysroot/symlink/to/file.txt"
}
)Args
rule: Adictrule definition.asset: Adictcontainingsourceanddestinationpaths for the symbolic link.
add_archive()
def add_archive() -> NoneTypeAdds an archive to the workspace.
checkout.add_archive(
# the rule name is the path in the workspace where the archive will be extracted
rule = {"name": "llvm-project"},
archive = {
"url": "[https://github.com/llvm/llvm-project/archive/refs/tags/llvmorg-](https://github.com/llvm/llvm-project/archive/refs/tags/llvmorg-){}.zip".format(version),
"sha256": "27b5c7c745ead7e9147c78471b9053d4f6fc3bed94baf45f4e8295439f564bb8",
"link": "Hard",
"strip_prefix": "llvm-project-llvmorg-{}".format(version),
"add_prefix": "llvm-project",
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).archive: Adictcontainingurl(str),sha256(str),link(None|Hard),globs(list),strip_prefix(str), andadd_prefix(str).
set_max_run_queue()
def set_max_run_queue(count) -> NoneTypeSets the maximum number of concurrent tasks during the run phase.
This overrides the default max_queue_count for tasks in the Run phase.
checkout.set_max_run_queue(16)Args
count: int: Maximum number of concurrent run tasks.
add_exec()
def add_exec() -> NoneTypeAdds a process to execute during checkout.
checkout.add_exec(
rule = {"name": "my_rule", "deps": ["my_other_rule"]},
exec = {"command": "ls", "arguments": ["-l"]}
)Args
rule: Adictcontaining the rule definition.exec: Adictcontaining the execution details.
add_repo()
def add_repo() -> NoneTypeAdds a git repository to the workspace.
checkout.add_repo(
# the rule name is also the path in the workspace where the clone will be
rule = { "name": "spaces" },
repo = {
"url": "[https://github.com/work-spaces/spaces](https://github.com/work-spaces/spaces)",
"rev": "main",
"checkout": "Revision",
"clone": "Default",
"is_evaluate_spaces_modules": True
}
)Args
rule: rule definition containingrepo: repository details containing
modify_value()
def modify_value(key, modifier) -> NoneTypeModifies a stored value by applying a lambda to the current value.
The current value for key (or None if missing) is passed to modifier.
The lambda return value is stored back in the checkout store.
checkout.modify_value("build_count", lambda current: (current or 0) + 1)
checkout.modify_value(
"settings",
lambda current: {"enabled": True} if current == None else current,
path = "my/custom/path",
)Args
key: string: A string key to identify the stored value.modifier: A lambda/function taking one argument (the current value).path: string: Optional path to store under. When omitted, the member path for the calling module is used.
add_platform_archive()
def add_platform_archive() -> NoneTypeAdds an archive to the workspace based on the platform.
base = {
"add_prefix": "sysroot/bin",
"strip_prefix": "target/release",
"link": "Hard",
}
macos_x86_64 = base | {
"url": "[https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-macos-latest-x86_64-v0.6.0-beta.13.zip](https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-macos-latest-x86_64-v0.6.0-beta.13.zip)",
"sha256": "47d325145e6f7f870426f1b123c781f89394b0458bb43f5abe2d36ac3543f7ef",
}
macos_aarch64 = base | {
"url": "[https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-macos-latest-aarch64-v0.6.0-beta.13.zip](https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-macos-latest-aarch64-v0.6.0-beta.13.zip)",
"sha256": "6dd972454942faa609670679c53b6876ab8e66bcfd0b583ee5a8d13c93b2e879",
}
linux_x86_64 = base | {
"url": "[https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-linux-gnu-x86_64-v0.6.0-beta.13.zip](https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-linux-gnu-x86_64-v0.6.0-beta.13.zip)",
"sha256": "39030124f18b338eceee09061fb305b522ada76f6a0562f9926ea0747b3ad440",
}
linux_aarch64 = base | {
"url": "https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-linux-gnu-aarch64-v0.6.0-beta.13.zip",
"sha256": "39030124f18b338eceee09061fb305b522ada76f6a0562f9926ea0747b3ad440",
}
windows_x86_64 = base | {
"url": "[https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-windows-latest-x86_64-v0.6.0-beta.13.exe](https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-windows-latest-x86_64-v0.6.0-beta.13.exe)",
"sha256": "b93dc96b2c66fcfc4aef851db2064f6e6ecb54b29968ca5174f6b892b99651c8",
}
windows_aarch64 = base | {
"url": "[https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-windows-latest-aarch64-v0.6.0-beta.13.exe](https://github.com/work-spaces/spaces/releases/download/v0.6.0-beta.13/spaces-windows-latest-aarch64-v0.6.0-beta.13.exe)",
"sha256": "c67c7b23897e0949843e248465d5444428fb287f89dcd45cec76dde4b2cdc6a9",
}
checkout.add_platform_archive(
# rule name is only the path in the workspace if add_prefix is not set
rule = {"name": "spaces"},
platforms = {
"macos-x86_64": macos_x86_64,
"macos-aarch64": macos_aarch64,
"windows-x86_64": windows_x86_64,
"windows-aarch64": windows_aarch64,
"linux-x86_64": linux_x86_64,
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).platforms: Adictof platform keys (e.g.,macos-aarch64,linux-x86_64) mapping to archive detaildicts.
add_env_vars()
def add_env_vars() -> NoneTypeCreates or updates the environment in the workspace during checkout.
checkout.update_env(
rule = {"name": "update_env"},
env = {
"paths": [],
"system_paths": ["/usr/bin", "/bin"],
"vars": {
"PS1": '"(spaces) $PS1"',
},
"inherited_vars": ["HOME", "SHELL", "USER"],
"optional_inherited_vars": ["TERM"],
"secret_inherited_vars": ["SSH_AUTH_SOCK"],
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).any_env:
add_hard_link_asset()
def add_hard_link_asset() -> NoneTypeCreates a hardlink from a source to a destination path.
checkout.add_hard_link_asset(
rule = {
"name": "link_file",
},
asset = {
"source": "path/to/original/file.txt",
"destination": "sysroot/link/to/file.txt"
}
)Args
rule: Adictrule definition.asset: Adictcontainingsourceanddestinationpaths.
add_cargo_bin()
def add_cargo_bin() -> NoneTypeAdds a binary crate using cargo-binstall.
checkout.add_cargo_bin(
rule = {"name": "probe-rs-tools"},
cargo_bin = {
"crate": "probe-rs-tools",
"version": "0.24.0",
"bins": ["probe-rs", "cargo-embed", "cargo-flash"]
},
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).cargo_bin: Adictof crate details containingcrate(str),version(str), andbins(list).
abort()
def abort(message) -> NoneTypeAbort script evaluation with a message.
checkout.abort("Failed to do something")Args
message: string: Abort message to show the user.
update_asset()
def update_asset() -> NoneTypeCreates or updates an existing file containing structured data in the workspace.
cargo_vscode_task = {
"type": "cargo",
"problemMatcher": ["$rustc"],
"group": "build",
}
# Add some VS code tasks
checkout.update_asset(
rule = {"name": "vscode_tasks"},
asset = {
"destination": ".vscode/tasks.json",
"format": "json",
"value": {
"tasks": [
cargo_vscode_task | {
"command": "build",
"args": ["--manifest-path=spaces/Cargo.toml"],
"label": "build:spaces",
},
cargo_vscode_task | {
"command": "install",
"args": ["--path=spaces", "--root=${userHome}/.local", "--profile=dev"],
"label": "install_dev:spaces",
}
],
},
}
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).asset: Adictcontainingdestination(str),format(json|toml|yaml), andvalue(dict). Supports multi-rule updates to the same file if keys are unique.
add_target()
def add_target() -> NoneTypeAdds a target to organize dependencies.
checkout.add_target(
rule = {"name": "my_rule", "deps": ["my_other_rule"]},
)This function is deprecated in favor of [checkout.add].
Args
rule: Adictcontaining the rule definition (e.g.,name,deps,platforms,type, andhelp).
add_which_asset()
def add_which_asset() -> NoneTypeAdds a hardlink to an executable file available on the PATH.
checkout.add_which_asset(
rule = { "name": "which_pkg_config" },
asset = {
"which": "pkg-config",
"destination": "sysroot/bin/pkg-config"
}
)Args
rule: Adictrule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).asset: Adictcontainingwhich(str) anddestination(str). Note: This creates system dependencies that may break workspace hermeticity.
add_any_assets()
def add_any_assets() -> NoneTypeAdds any number of assets, with support for different asset types.
checkout.add_any_assets(
rule = {
"name": "add_multiple_files",
},
assets = [
{ "type": "hardlink", "source": "path/to/file1.txt", "destination": "sysroot/file1.txt" },
{ "type": "symlink", "source": "path/to/file2.txt", "destination": "sysroot/file2.txt" }
]
)Args
rule: Adictrule definition.assets: Alistof asset dictionaries, where each dictionary specifies the asset’s type and its properties (e.g., source and destination).
add_oras_archive()
def add_oras_archive() -> NoneTypeArgs
rule:oras_archive:
chr()
def chr() -> stringchr: returns a string encoding a codepoint.
chr(i) returns a string that encodes the single Unicode code
point whose value is specified by the integer i. chr fails
unless 0 ≤ i ≤ 0x10FFFF.
chr(65) == 'A'
chr(1049) == 'Й'
chr(0x1F63F) == '😿'debug()
def debug(content) -> NoneTypeArgs
content:
dir()
def dir() -> listdir: list attributes of a value.
dir(x) returns a list of the names of the attributes (fields and
methods) of its operand. The attributes of a value x are the names
f such that x.f is a valid expression.
"capitalize" in dir("abc")enumerate()
def enumerate(start) -> listenumerate: return a list of (index, element) from an iterable.
enumerate(x) returns a list of (index, value) pairs, each containing
successive values of the iterable sequence and the index of the
value within the sequence.
The optional second parameter, start, specifies an integer value to
add to each index.
enumerate(["zero", "one", "two"]) == [(0, "zero"), (1, "one"), (2, "two")]
enumerate(["one", "two"], 1) == [(1, "one"), (2, "two")]Args
start: int:
env
get()
def get(name, default)Gets an environment variable by name.
Returns None when the variable is absent and no default was supplied,
allowing callers to distinguish “not set” from “set to empty string”.
When a default is provided it is returned in place of None for missing
variables. Use env.has() as a lighter-weight existence check when the
value itself is not needed.
env.get("PATH") # -> str | None
env.get("PATH", default="/usr/bin") # -> str (never None when default given)Args
name: string:default: string:
which()
def which(name) -> stringFinds the first executable matching the given name in PATH.
Returns an empty string when the command is not found.
On Windows, also checks PATHEXT for recognised executable extensions
(.COM, .EXE, .BAT, .CMD).
If name contains path separators it is treated as a direct path probe
rather than a PATH search.
env.which("git") # -> str (empty string if not found)Args
name: string:
all()
def all()Returns all environment variables as a dictionary.
Non-UTF-8 keys or values are included with invalid bytes replaced by
the Unicode replacement character (U+FFFD) via lossy conversion.
This is consistent with the behaviour of cwd() and path_list().
vars = env.all() # -> dict[str, str]has()
def has(name) -> boolReturns whether an environment variable is present.
env.has("CI")Args
name: string:
cwd()
def cwd() -> stringReturns the current working directory.
Non-UTF-8 path components are replaced by the Unicode replacement character (U+FFFD) via lossy conversion.
env.cwd()chdir()
def chdir(path) -> NoneTypeChanges the current working directory.
env.chdir("subdir")Args
path: string:
path_list()
def path_list() -> listSplits PATH into a list of directory entries.
Handles the platform-specific separator (: on Unix/macOS, ; on Windows).
Non-UTF-8 path components are replaced by the Unicode replacement character
(U+FFFD) via lossy conversion.
env.path_list()path_join_entries()
def path_join_entries(entries) -> stringJoins a list of directory paths into a PATH-style string.
Uses the platform separator (: on Unix/macOS, ; on Windows).
This is the inverse of path_list(): use it to rebuild PATH after
modifying the list, avoiding hard-coded platform separators.
Returns an error if any entry contains the platform separator character.
env.path_join_entries(["/usr/bin", "/usr/local/bin"])
# -> "/usr/bin:/usr/local/bin" (Unix/macOS)Args
entries: list:
which_all()
def which_all(name) -> listFinds all executables matching the given name in PATH.
env.which_all("python") # -> list[str]Args
name: string:
eprint()
def eprint(content) -> NoneTypePrints a string to standard error, followed by a newline.
eprint("error: something went wrong")Args
content: string:
fail()
def fail()fail: fail the execution
fail("this is an error") # fail: this is an error
fail("oops", 1, False) # fail: oops 1 Falsefs
append_string_to_file()
def append_string_to_file() -> NoneTypeAppends a string to the end of a file at the specified path.
Args
path: string:content: string:
write_json_from_dict()
def write_json_from_dict(path, value) -> NoneTypeArgs
path: string:value:pretty: bool:
is_file()
def is_file(path) -> boolReturns true if the given path is a file.
Args
path: string:
read_bytes()
def read_bytes(path) -> listArgs
path: string:
read_toml_to_dict()
def read_toml_to_dict(path)Reads a TOML file and returns its contents as a dictionary.
Args
path: string:
is_symlink()
def is_symlink(path) -> boolReturns true if the given path is a symbolic link.
Args
path: string:
is_text_file()
def is_text_file(path) -> boolReturns true if the given path is a text file (valid UTF-8 with no NUL bytes).
Args
path: string:
read_lines()
def read_lines(path) -> listArgs
path: string:
write_lines()
def write_lines(path, lines) -> NoneTypeArgs
path: string:lines:
write_toml_from_dict()
def write_toml_from_dict(path, value) -> NoneTypeArgs
path: string:value:
exists()
def exists(path) -> boolReturns true if the given path exists.
Args
path: string:
write_yaml_from_dict()
def write_yaml_from_dict(path, value) -> NoneTypeArgs
path: string:value:
set_permissions()
def set_permissions(path) -> NoneTypeArgs
path: string:mode: int:
read_yaml_to_dict()
def read_yaml_to_dict(path)Reads a YAML file and returns its contents as a dictionary.
Args
path: string:
write_string_to_file()
def write_string_to_file() -> NoneTypeWrites a string to a file at the specified path relative to the workspace root.
Args
path: string:content: string:
symlink()
def symlink(target, link) -> NoneTypeArgs
target: string:link: string:
metadata()
def metadata(path)Args
path: string:
modified()
def modified(path) -> floatArgs
path: string:
write_string_atomic()
def write_string_atomic(path, content) -> NoneTypeArgs
path: string:content: string:mode: int:
touch()
def touch(path) -> NoneTypeArgs
path: string:create: bool:update_mtime: bool:
write_bytes()
def write_bytes(path, data) -> NoneTypeArgs
path: string:data:
is_directory()
def is_directory(path) -> boolReturns true if the given path is a directory.
Args
path: string:
move()
def move(src, dst) -> NoneTypeArgs
src: string:dst: string:overwrite: bool:
chown()
def chown(path) -> NoneTypeArgs
path: string:user: string:group: string:
chmod()
def chmod(path, spec) -> NoneTypeArgs
path: string:spec: string:
copy()
def copy(src, dst) -> NoneTypeArgs
src: string:dst: string:recursive: bool:overwrite: bool:follow_symlinks: bool:
read_file_to_string()
def read_file_to_string(path) -> stringReads the contents of a file and returns it as a string.
Args
path: string:
read_json_to_dict()
def read_json_to_dict(path)Reads a JSON file and returns its contents as a dictionary.
Args
path: string:
size()
def size(path) -> intArgs
path: string:
read_directory()
def read_directory(path) -> listReads the contents of a directory and returns a list of paths.
Args
path: string:
remove()
def remove(path) -> NoneTypeArgs
path: string:recursive: bool:missing_ok: bool:
read_link()
def read_link(path) -> stringArgs
path: string:
mkdir()
def mkdir(path) -> NoneTypeArgs
path: string:parents: bool:exist_ok: bool:
getattr()
def getattr()getattr: returns the value of an attribute
getattr(x, name) returns the value of the attribute (field or method)
of x named name. It is a dynamic error if x has no such attribute.
getattr(x, "f") is equivalent to x.f.
getattr(x, "f", d) is equivalent to x.f if hasattr(x, "f") else d
and will never raise an error.
getattr("banana", "split")("a") == ["b", "n", "n", ""] # equivalent to "banana".split("a")hasattr()
def hasattr() -> boolhasattr: test if an object has an attribute
hasattr(x, name) reports whether x has an attribute (field or method)
named name.
hash
compute_sha256_from_file()
def compute_sha256_from_file(file_path) -> stringComputes the SHA-256 checksum for the contents of a file.
The file is read in 64 KiB streaming chunks so large files do not cause excessive memory consumption.
checksum = hash.compute_sha256_from_file("data/model.bin")
print(f"File SHA-256: {checksum}")Args
file_path: string:
md5_string()
def md5_string(input) -> stringComputes MD5 for a string, returning a 32-character hex digest.
MD5 is cryptographically broken; do not use for security purposes. Provided for legacy compatibility and non-critical checksums only.
Args
input: string:
md5_file()
def md5_file(file_path) -> stringComputes MD5 for a file, streaming in 64 KiB chunks.
MD5 is cryptographically broken; do not use for security purposes.
Args
file_path: string:
base64_decode()
def base64_decode(input) -> stringDecodes a standard-alphabet base64 string into raw bytes and returns them as a UTF-8 string.
Returns an error if the base64 is malformed or the decoded bytes are not valid UTF-8.
decoded = hash.base64_decode("SGVsbG8sIFdvcmxkIQ==") # "Hello, World!"Args
input: string:
blake3_string()
def blake3_string(input) -> stringComputes BLAKE3 for a string, returning a 64-character hex digest.
BLAKE3 is a modern cryptographic hash function that is significantly faster than SHA-256/SHA-512 while offering equivalent security. It supports arbitrary-length output; this function returns the default 256-bit (64 hex character) digest.
digest = hash.blake3_string("my-data")
print(f"BLAKE3: {digest}")Args
input: string:
sha1_file()
def sha1_file(file_path) -> stringComputes SHA-1 for a file, streaming in 64 KiB chunks.
SHA-1 is cryptographically weak; prefer SHA-256 for new applications.
Args
file_path: string:
sha256_file()
def sha256_file(file_path) -> stringAlias of compute_sha256_from_file.
The file is read in 64 KiB streaming chunks.
Args
file_path: string:
crc32_file()
def crc32_file(file_path) -> stringComputes CRC32 for a file, streaming in 64 KiB chunks.
Returns a zero-padded 8-character hex string. CRC32 is not a cryptographic hash; use only for accidental-corruption detection.
Args
file_path: string:
sha512_string()
def sha512_string(input) -> stringComputes SHA-512 for a string, returning a 128-character hex digest.
Args
input: string:
sha256_string()
def sha256_string(input) -> stringAlias of compute_sha256_from_string.
Args
input: string:
sha1_string()
def sha1_string(input) -> stringComputes SHA-1 for a string, returning a 40-character hex digest.
SHA-1 is cryptographically weak; prefer SHA-256 for new applications.
Args
input: string:
hex_encode()
def hex_encode(bytes) -> stringHex-encodes the raw bytes of a string.
encoded = hash.hex_encode("Hello") # "48656c6c6f"Args
bytes: string:
sha512_file()
def sha512_file(file_path) -> stringComputes SHA-512 for a file, streaming in 64 KiB chunks.
Args
file_path: string:
compute_sha256_from_string()
def compute_sha256_from_string(input) -> stringComputes the SHA-256 checksum for a given string.
text_hash = hash.compute_sha256_from_string("my-unique-identity")
print(f"String Hash: {text_hash}")Args
input: string:
blake3_file()
def blake3_file(file_path) -> stringComputes BLAKE3 for a file, streaming in 64 KiB chunks.
Returns a 64-character lowercase hex digest. The file is never fully loaded into memory, making this safe for large files.
digest = hash.blake3_file("data/model.bin")
print(f"BLAKE3: {digest}")Args
file_path: string:
crc32_string()
def crc32_string(input) -> stringComputes CRC32 for a string, returning a zero-padded 8-character hex string.
CRC32 is not a cryptographic hash; use only for accidental-corruption detection, never for security.
Args
input: string:
hex_decode()
def hex_decode(input) -> stringDecodes a hex string into raw bytes and returns them as a UTF-8 string.
Returns an error if the hex is malformed or the decoded bytes are not valid UTF-8.
decoded = hash.hex_decode("48656c6c6f") # "Hello"Args
input: string:
base64_encode()
def base64_encode(bytes) -> stringBase64-encodes the raw bytes of a string (standard alphabet, with padding).
encoded = hash.base64_encode("Hello, World!") # "SGVsbG8sIFdvcmxkIQ=="Args
bytes: string:
info
is_ci()
def is_ci() -> boolReturns true if the --ci flag was passed on the command line.
if info.is_ci():
# ...Returns
bool: True if the--ciflag is present, False otherwise.
is_platform_windows()
def is_platform_windows() -> boolReturns true if the current platform is Windows.
if info.is_platform_windows():
# ...Returns
bool: True if the platform iswindows-x86_64orwindows-aarch64, False otherwise.
is_platform_aarch64()
def is_platform_aarch64() -> boolReturns true if the current platform architecture is aarch64.
if info.is_platform_aarch64():
# ...Returns
bool: True if the platform architecture isaarch64, False otherwise.
get_supported_platforms()
def get_supported_platforms() -> listReturns a list of all platforms supported by the system.
supported = info.get_supported_platforms()
if "macos-aarch64" in supported:
print("This system supports Apple Silicon.")Returns
list[str]: A list of supported platform identifiers, such asmacos-aarch64,linux-x86_64, etc.
get_cpu_count()
def get_cpu_count() -> intReturns the number of CPUs on the current machine.
num_cpus = info.get_cpu_count()Returns
int: The total number of logical CPU cores available.
check_required_semver()
def check_required_semver(required) -> boolChecks if the current version of spaces satisfies the given semver requirement.
is_compatible = info.check_required_semver("^2.1.0")
if not is_compatible:
# ...Args
required: string: The semantic version requirement string to check against.
Returns
bool: True if the current version satisfies the requirement, False otherwise.
get_path_to_spaces_tools()
def get_path_to_spaces_tools() -> stringReturns the path to the spaces tools directory.
tools_path = info.get_path_to_spaces_tools()Returns
str: The absolute path to the spaces tools directory.
set_required_semver()
def set_required_semver(required) -> NoneTypeSets the semantic version of spaces required to run the workspace.
info.set_required_semver("^2.1.0")Args
required: string: The semantic version requirement string (e.g., “^2.1.0”).
get_log_divider_string()
def get_log_divider_string() -> stringReturns a string representing the end of the log header.
divider = info.get_log_divider_string()Returns
str: The standard divider string used to separate log headers from the body.
parse_log_file()
def parse_log_file(path)Parses a log file into its YAML header and message lines.
log_data = info.parse_log_file("outputs/build.log")
for line in log_data['lines']:
# ...Args
path: string: The absolute or relative path to the spaces log file.
Returns
dict: A dictionary containingheader(dict) with the parsed YAML metadata andlines(list[str]) with the body of the log.
get_platform_name()
def get_platform_name() -> stringReturns the name of the current operating system and architecture platform.
platform = info.get_platform_name()
if "linux" in platform:
# ...Returns
str: The platform identifier, such asmacos-aarch64,macos-x86_64,linux-x86_64,linux-aarch64,windows-x86_64, orwindows-aarch64.
is_platform_linux()
def is_platform_linux() -> boolReturns true if the current platform is Linux.
if info.is_platform_linux():
# ...Returns
bool: True if the platform islinux-x86_64orlinux-aarch64, False otherwise.
set_minimum_version()
def set_minimum_version(version) -> NoneTypeSets the minimum version of spaces required to run the script.
info.set_minimum_version("1.2.0")Args
version: string: The minimum version string (e.g., “1.5.0”) required.
set_max_queue_count()
def set_max_queue_count(_count) -> NoneTypeSets the maximum number of items that can be queued at one time.
This is deprecated. Use checkout.set_max_run_queue() and checkout.set_max_checkout_queue() instead.
info.set_max_queue_count(10)Args
_count: int:
is_platform_x86_64()
def is_platform_x86_64() -> boolReturns true if the current platform architecture is x86_64.
if info.is_platform_x86_64():
# ...Returns
bool: True if the platform architecture isx86_64, False otherwise.
abort()
def abort(message) -> NoneTypeAbort script evaluation with a message.
info.abort("Failed to do something")Args
message: string: Abort message to show the user.
get_path_to_store()
def get_path_to_store() -> stringReturns the path to the spaces store.
store_path = info.get_path_to_store()Returns
str: The absolute path to the local spaces store directory.
get_execution_phase()
def get_execution_phase() -> stringReturns the current execution phase of the system.
phase = info.get_execution_phase()
if phase == "Run":
# ...Returns
str: The current phase, which will be “Run”, “Checkout”, or “Inspect”.
is_platform_macos()
def is_platform_macos() -> boolReturns true if the current platform is macOS.
if info.is_platform_macos():
# ...Returns
bool: True if the platform ismacos-aarch64ormacos-x86_64, False otherwise.
json
is_string_json()
def is_string_json(value) -> boolReturns true if the given string is valid JSON.
if json.is_string_json('{"key": "value"}'):
print("Valid JSON")Args
value: string: The string to check.
Returns
bool: True if the string is valid JSON, False otherwise.
try_string_to_dict()
def try_string_to_dict(content)Tries to convert a JSON formatted string into a Starlark dictionary/value.
On success, returns the parsed Starlark value. On failure, returns the
value supplied to default (which itself defaults to None).
This is preferred over calling is_string_json followed by
string_to_dict because it parses the input only once. A named
default parameter lets callers supply a sentinel that is
distinguishable from a successfully decoded JSON null:
MISSING = "PARSE_FAILED"
result = json.try_string_to_dict(raw, default = MISSING)
if result == MISSING:
print("input was not valid JSON")
elif result == None:
print("input was the JSON literal null")
else:
print(result["key"])When called without default the behaviour is identical to the
original: None is returned on parse failure.
raw_data = '{"id": 101, "status": "active"}'
result = json.try_string_to_dict(raw_data)
if result != None:
print(result["status"])
else:
print("Failed to parse JSON")Args
content: string: The JSON-formatted string to be converted.default: (named, optional): Value to return when parsing fails. Defaults toNone.
Returns
dict | <default>: A Starlark value representing the parsed JSON, or thedefaultvalue if parsing fails.
to_string_indented()
def to_string_indented(value) -> stringConverts a dictionary or Starlark value into a pretty-printed JSON string with a configurable indentation width.
This is the flexible alternative to to_string_pretty, allowing you to
control exactly how many spaces are used for each indentation level.
Use indent = 0 to get compact output with newlines but no indentation
(unusual but valid). NaN and Infinity values are rejected with an error.
data = {"key": "value", "nums": [1, 2, 3]}
# 4-space indentation
wide = json.to_string_indented(data, indent = 4)
# 1-space indentation
narrow = json.to_string_indented(data, indent = 1)Args
value: The dictionary or Starlark value to be serialized.indent: int: Number of spaces to use per indentation level (0–16).
Returns
str: The formatted, multi-line JSON string with the requested indentation.
to_string()
def to_string(value) -> stringConverts a dictionary or Starlark value into a JSON-formatted string.
This is the inverse of string_to_dict. It takes structured data
and serializes it into a string, making it ready to be written
to a file or sent over a network.
data = {"name": "Project Alpha", "version": 1}
json_string = json.to_string(data)Args
value: The dictionary or Starlark value to be serialized.
Returns
str: The JSON string representation of the input value.
string_to_dict()
def string_to_dict(content)Converts a JSON formatted string into a Python dictionary.
This function acts as a parser, taking a raw JSON string and transforming it into a structured dictionary that you can easily manipulate in your scripts.
raw_data = '{"id": 101, "status": "active"}'
data_dict = json.string_to_dict(raw_data)
print(data_dict["status"])Args
content: string: The JSON-formatted string to be converted.
Returns
dict: A dictionary representation of the JSON data.
to_string_pretty()
def to_string_pretty(value) -> stringConverts a dictionary or Starlark value into a “pretty-printed” JSON string.
Unlike to_string, this version adds newlines and indentation to make
the output easily readable by humans. The default indentation is 2 spaces.
Use to_string_indented if you need a different indentation width.
data = {"project": "Gemini", "active": True, "tags": ["ai", "helper"]}
pretty_json = json.to_string_pretty(data)
print(pretty_json)Args
value: The dictionary or Starlark value to be serialized.
Returns
str: The formatted, multi-line JSON string indented with 2 spaces.
len()
def len() -> intlen: get the length of a sequence
len(x) returns the number of elements in its argument.
It is a dynamic error if its argument is not a sequence.
len(()) == 0
len({}) == 0
len([]) == 0
len([1]) == 1
len([1,2]) == 2
len({'16': 10}) == 1
len(True) # error: not supportedlog
set_format()
def set_format(format) -> NoneTypeSet the log format.
log.set_format("text") # human-readable (default)
log.set_format("json") # structured JSONArgs
format: string:
debug()
def debug(message) -> NoneTypeLog at debug level.
log.debug("Debug message")Args
message: string:
info()
def info(message) -> NoneTypeLog at info level.
log.info("Info message")Args
message: string:
trace()
def trace(message) -> NoneTypeLog at trace level.
log.trace("Trace message")Args
message: string:
fatal()
def fatal(message) -> NoneTypeLog at error level and abort execution.
log.fatal("Fatal error message")Args
message: string:
set_level()
def set_level(level) -> NoneTypeSet the log level.
log.set_level("trace")
log.set_level("debug")
log.set_level("info")
log.set_level("warn")
log.set_level("error")
log.set_level("off")Args
level: string:
warn()
def warn(message) -> NoneTypeLog at warn level.
log.warn("Warning message")Args
message: string:
error()
def error(message) -> NoneTypeLog at error level.
log.error("Error message")Args
message: string:
max()
def max()max: returns the maximum of a sequence.
max(x) returns the greatest element in the iterable sequence x.
It is an error if any element does not support ordered comparison, or if the sequence is empty.
The optional named parameter key specifies a function to be applied
to each element prior to comparison.
max([3, 1, 4, 1, 5, 9]) == 9
max("two", "three", "four") == "two" # the lexicographically greatest
max("two", "three", "four", key=len) == "three" # the longestArgs
key:
min()
def min()min: returns the minimum of a sequence.
min(x) returns the least element in the iterable sequence x.
It is an error if any element does not support ordered comparison, or if the sequence is empty.
min([3, 1, 4, 1, 5, 9]) == 1
min("two", "three", "four") == "four" # the lexicographically least
min("two", "three", "four", key=len) == "two" # the shortestArgs
key:
ord()
def ord() -> intord: returns the codepoint of a character
ord(s) returns the integer value of the sole Unicode code point
encoded by the string s.
If s does not encode exactly one Unicode code point, ord fails.
Each invalid code within the string is treated as if it encodes the
Unicode replacement character, U+FFFD.
Example:
ord("A") == 65
ord("Й") == 1049
ord("😿") == 0x1F63Fpath
join()
def join(parts) -> stringJoins path segments using the platform separator.
p = path.join(["a", "b", "c.txt"]) # "a/b/c.txt" (or "a\\b\\c.txt" on Windows)Args
parts: list:
dirname()
def dirname(path) -> stringReturns the directory portion of the path.
Args
path: string:
absolute()
def absolute(path) -> stringReturns an absolute path, resolving against current working directory for relative paths.
Args
path: string:
parent()
def parent(path, n) -> stringReturns the n-th parent (default n=1).
path.parent("a/b/c", 2) # "a"Args
path: string:n: int:
separator()
def separator() -> stringReturns the platform path separator.
components()
def components(path) -> listReturns normalized path components as strings.
Args
path: string:
normalize()
def normalize(path) -> stringNormalizes path components lexically (collapses repeated separators, . and .. where possible). Does not touch filesystem or resolve symlinks.
Args
path: string:
basename()
def basename(path) -> stringReturns the final path component.
Args
path: string:
relative_to()
def relative_to(target, base) -> stringComputes target relative to base.
path.relative_to("a/b/c", "a/d") # "../b/c"Args
target: string:base: string:
extension()
def extension(path) -> stringReturns the file extension without dot.
Args
path: string:
with_extension()
def with_extension(path, ext) -> stringReplaces the file extension.
path.with_extension("a.txt", "md") # "a.md"Args
path: string:ext: string:
is_absolute()
def is_absolute(path) -> boolReturns true if path is absolute.
Args
path: string:
expand_user()
def expand_user(path) -> stringExpands leading ~ to home directory.
Args
path: string:
_path_module_loaded()
def _path_module_loaded() -> NoneTypeConvenience no-op to mirror other modules that expose at least one side-effect-free utility.
canonicalize()
def canonicalize(path) -> stringCanonicalizes a path (resolves symlinks, . and ..) on the real filesystem.
Args
path: string:
expand_vars()
def expand_vars(path) -> stringExpands $VAR and ${VAR} tokens from process environment.
Args
path: string:
split()
def split(path) -> tupleSplits a path into (dirname, basename).
d, b = path.split("a/b/c.txt") # ("a/b", "c.txt")Args
path: string:
stem()
def stem(path) -> stringReturns the file stem (filename without final extension).
Args
path: string:
print()
def print(content) -> NoneTypePrints a string to standard output, followed by a newline.
print("hello, world")Args
content: string:
process
pipeline()
def pipeline(steps)Execute commands serially, piping stdout of each into stdin of the next.
Input: list[RunOptions] Output: {“status”: int, “stdout”: str, “stderr”: str, “duration_ms”: int}
Args
steps:
is_running()
def is_running(handle) -> boolReturns true if the process associated with the handle is still running.
Args
handle: int:
run()
def run(options)Streaming-capable run with explicit redirection and timeout/check behavior.
New Parameters (optional)
stdout_path– If set, writes the command’s captured stdout to this file path (creating/truncating the file). The returnedstdoutfield still contains the captured string.stderr_path– If set, writes the command’s captured stderr to this file path (creating/truncating the file). The returnedstderrfield still contains the captured string.tee– WhenTrue, also forwards stdout/stderr to the calling process’s stdout/stderr after capturing.
Args
options:
kill()
def kill(handle, signal) -> boolSend a signal to a background process.
Supported values:
- “SIGTERM” (default): graceful terminate
- “SIGKILL”: hard kill
Args
handle: int:signal: string:
capture()
def capture(argv) -> string$(...)-style helper: run a command and return trimmed stdout. Raises on non-zero status.
Args
argv:
exec()
def exec(exec)Executes a process and captures its output and status.
Args
exec:
spawn()
def spawn(options) -> intSpawn a background process and return an opaque numeric handle.
Example: handle = process.spawn({“command”: “server”, “args”: ["–port", “8080”]})
Args
options:
wait()
def wait(handle, timeout_ms)Wait for background process completion.
Returns: {“status”: int, “stdout”: str, “stderr”: str, “duration_ms”: int}
Args
handle: int:timeout_ms: int:
repr()
def repr() -> stringrepr: formats its argument as a string.
All strings in the result are double-quoted.
repr(1) == '1'
repr("x") == "\"x\""
repr([1, "x"]) == "[1, \"x\"]"
repr("test \"'") == "\"test \\\"'\""
repr("x\"y😿 \\'") == "\"x\\\"y\\U0001f63f \\\\'\""reversed()
def reversed() -> listreversed: reverse a sequence
reversed(x) returns a new list containing the elements of the iterable
sequence x in reverse order.
reversed(['a', 'b', 'c']) == ['c', 'b', 'a']
reversed(range(5)) == [4, 3, 2, 1, 0]
reversed("stressed".elems()) == ["d", "e", "s", "s", "e", "r", "t", "s"]
reversed({"one": 1, "two": 2}.keys()) == ["two", "one"]rlog
trace()
def trace(message) -> NoneTypeLog a trace-level message on the active console.
rlog.trace("starting checkout")Args
message: string:
debug()
def debug(message) -> NoneTypeLog a debug-level message on the active console.
rlog.debug("resolved version 1.2.3")Args
message: string:
message()
def message(message) -> NoneTypeLog a high-level user-facing message on the active console.
rlog.message("--Building--")Args
message: string:
info()
def info(message) -> NoneTypeLog an informational message on the active console.
rlog.info("workspace ready")Args
message: string:
warn()
def warn(message) -> NoneTypeQueue a deferred warning to be displayed at the end of the run.
rlog.warn("deprecated rule used")Args
message: string:
error()
def error(message) -> NoneTypeLog an error-level message on the active console.
rlog.error("something went wrong")Args
message: string:
run
add_kill_exec()
def add_kill_exec() -> NoneTypeAdds a rule that will kill the execution of another rule.
run.add_kill_exec(
rule = {"name": "stop_service", "type": "Run"},
kill = {
"signal": "Terminate",
"target": "my_long_running_service",
"expect": "Success",
},
)Args
rule: Rule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).kill: Kill details containingsignal(Hup|Int|Quit|Abort|Kill|Alarm|Terminate|User1|User2),target(str), andexpect(Failure|Success|Any).
add_exec()
def add_exec() -> NoneTypeAdds a rule that will execute a process.
run.add_exec(
rule = {
"name": name,
"type": "Setup",
"deps": ["sysroot-python:venv"],
},
exec = {
"command": "pip3",
"args": ["install"] + packages,
},
)Args
rule: Rule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).exec: Execution details containingcommand(str),args(list),env(dict),working_directory(str),expect(Failure|Success|Any), andredirect_stdout(str).
abort()
def abort(message) -> NoneTypeAbort script evaluation with a message.
run.abort("Failed to do something")Args
message: string: Abort message to show the user.
add()
def add() -> NoneTypeAdds a rule that depends on other rules but doesn’t execute any command.
There is no specific action for the rule, but this rule can be useful for organizing dependencies.
run.add(
rule = {
"name": "my_rule",
"deps": ["my_other_rule"],
},
)Args
rule: Rule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).
add_target()
def add_target() -> NoneTypeAdds a rule that depends on other rules.
This rule will be deprecated in favor of run.add.
Args
rule: Rule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).
add_archive()
def add_archive() -> NoneTypeAdds a rule that will archive a directory.
run.add_archive(
rule = {"name": name, "type": "Optional", "deps": ["sysroot-python:venv"]},
archive = {
"input": "build/install",
"name": "my_archive",
"version": "1.0",
"driver": "tar.gz",
},
)Args
rule: Rule definition containingname(str),deps(list),platforms(list),type(str), andhelp(str).archive: Archive details containinginput(str),name(str),version(str),driver(tar.gz|tar.bz2|zip|tar.7z|tar.xz),platform(str),includes(list), andexcludes(list).
add_from_clone()
def add_from_clone() -> NoneTypeAdds a rule that will execute based on the cloned from rule.
The new rule is merged with the cloned rule: the new rule’s fields take precedence, and the exec/target is taken from the cloned rule.
run.add_from_clone(
rule = {
"name": "my_new_rule",
"deps": ["other_dep"],
},
clone_from = "existing_rule",
)Args
rule: Rule definition containingname(str),deps(list),type(str), andhelp(str).clone_from: string: The name of an existing rule whose exec will be cloned.
script
get_arg()
def get_arg(offset) -> stringRetrieves a specific command-line argument by its index.
If no argument exists at the given offset, an empty string is returned.
first_arg = script.get_arg(0)
print(f"First argument: {first_arg}")Args
offset: int: The positional index of the argument (0-based).
Returns
str: The argument value or an empty string.
abort()
def abort(message) -> NoneTypeAborts execution immediately.
This function terminates the script with a non-zero exit code (indicating failure) and prints the provided message to the standard error stream (stderr).
script.abort("Failed to do something")Args
message: string: The error message to display upon termination.
print()
def print(content) -> NoneTypeOutputs a string to the standard output (stdout).
This is intended for use within script execution to provide feedback or data to the user.
script.print("Hello, world!")Args
content: string: The message to print.
get_args()
def get_args()Parses command-line arguments into structured categories.
This function separates “ordered” arguments (standalone values) from “named” arguments (key=value pairs).
args = script.get_args()
for arg in args["ordered"]:
print(arg)Returns
dict: A dictionary containingordered(list[str]) andnamed(dict).
set_exit_code()
def set_exit_code(exit_code) -> NoneTypeSets the final exit code for the script without terminating it.
Use 0 for success and non-zero for failure. The script will continue
executing until it reaches the end or an abort call.
script.set_exit_code(1)Args
exit_code: int: The integer exit code to be returned upon completion.
semver
is_valid_version()
def is_valid_version(version) -> boolValidates that the given string is a well-formed semantic version.
if semver.is_valid_version("1.2.3"):
# ...Args
version: string: The version string to validate (e.g.,"1.2.3","1.2.3-rc.1+build.5").
Returns
bool: True if the string is a valid semantic version, False otherwise.
bump_minor()
def bump_minor(version) -> stringIncrements the minor component of a version, resetting patch, pre, and build.
next_minor = semver.bump_minor("1.2.3")
# next_minor == "1.3.0"Args
version: string: The version string to bump.
Returns
str: The bumped version.
compare()
def compare(lhs, rhs) -> intCompares two semantic versions.
order = semver.compare("1.2.3", "1.2.4")
# order == -1Args
lhs: string: The first version string.rhs: string: The second version string.
Returns
int: -1 iflhs < rhs, 0 if equal, 1 iflhs > rhs.
is_valid_requirement()
def is_valid_requirement(requirement) -> boolValidates that the given string is a well-formed semantic version requirement.
if semver.is_valid_requirement("^1.2.0"):
# ...Args
requirement: string: The requirement string to validate (e.g.,"^1.2.0",">=1.0, <2.0","*").
Returns
bool: True if the string is a valid semver requirement, False otherwise.
matches_all()
def matches_all(version, requirements) -> boolReturns true if the given version satisfies all of the given requirements.
if semver.matches_all("1.2.5", ["^1.2.0", ">=1.2.4"]):
# ...Args
version: string: The semantic version string.requirements: list: A list of semver requirement strings, all of which must be satisfied.
Returns
bool: True if the version satisfies every requirement, False otherwise.
bump_major()
def bump_major(version) -> stringIncrements the major component of a version, resetting minor, patch, pre, and build.
next_major = semver.bump_major("1.2.3-rc.1")
# next_major == "2.0.0"Args
version: string: The version string to bump.
Returns
str: The bumped version.
extract_all_versions()
def extract_all_versions(name) -> listExtracts every semantic version found in the given string, in the order they appear.
Optionally accepts a list of suffixes to strip from the end of name
before scanning. Suffixes are stripped repeatedly until none match, so
passing e.g. [".gz", ".tar"] reduces "foo-1.2.3.tar.gz" to "foo-1.2.3"
before the regex runs. This is useful when archive extensions would
otherwise be greedily consumed as part of a pre-release identifier.
versions = semver.extract_all_versions("upgrade 1.2.3 to 2.0.0")
# versions == ["1.2.3", "2.0.0"]
versions = semver.extract_all_versions(
"my-tool-1.2.3-rc.1.tar.gz",
suffixes = [".tar.gz", ".zip"],
)
# versions == ["1.2.3-rc.1"]Args
name: string: The string to scan for versions.suffixes: list: Optional list of suffixes to strip from the end ofnamebefore scanning. Stripping is applied repeatedly until no suffix matches.
Returns
list[str]: All valid semantic versions found in the input.
extract_version()
def extract_version(name)Extracts the first semantic version found anywhere in the given string.
Useful for parsing a version out of a package name, archive filename, or tag.
The version may appear as any substring of the input (e.g., "foo-1.2.3",
"libthing_2.0.0-rc.1.tar.gz", "v1.2.3+build.5").
Optionally accepts a list of suffixes to strip from the end of name
before scanning. Suffixes are stripped repeatedly until none match, so
passing e.g. [".tar.gz"] reduces "my-tool-1.2.3-rc.1.tar.gz" to
"my-tool-1.2.3-rc.1" before the regex runs. This is useful when archive
extensions would otherwise be greedily consumed as part of a pre-release
identifier.
version = semver.extract_version(
"my-tool-1.2.3-rc.1.tar.gz",
suffixes = [".tar.gz"],
)
# version == "1.2.3-rc.1"Args
name: string: The package name (or any string) to scan for a version.suffixes: list: Optional list of suffixes to strip from the end ofnamebefore scanning. Stripping is applied repeatedly until no suffix matches.
Returns
str | None: The first valid semantic version found, orNoneif none is present.
parse()
def parse(version)Parses a semantic version string into its component parts.
parts = semver.parse("1.2.3-rc.1+build.5")
# parts == {"major": 1, "minor": 2, "patch": 3, "pre": "rc.1", "build": "build.5"}Args
version: string: The version string to parse.
Returns
dict: A dictionary withmajor(int),minor(int),patch(int),pre(str), andbuild(str).
min()
def min(versions) -> stringReturns the minimum version from a list of semantic versions.
oldest = semver.min(["1.2.0", "1.10.0", "1.2.10"])
# oldest == "1.2.0"Args
versions: list: A non-empty list of version strings.
Returns
str: The smallest version in the list.
validate_requirements()
def validate_requirements(requirements) -> NoneTypeValidates a list of semver requirements, returning an error for the first invalid entry.
semver.validate_requirements(["^1.0", ">=2.0"])Args
requirements: list: The list of semver requirement strings to validate.
bump_patch()
def bump_patch(version) -> stringIncrements the patch component of a version, resetting pre and build.
next_patch = semver.bump_patch("1.2.3")
# next_patch == "1.2.4"Args
version: string: The version string to bump.
Returns
str: The bumped version.
resolve_all()
def resolve_all(versions, requirements) -> listReturns a list of versions that satisfy all of the given requirements, sorted in descending order (highest first).
candidates = semver.resolve_all(
["1.0.0", "1.2.0", "1.2.5", "2.0.0"],
["^1.0"],
)
# candidates == ["1.2.5", "1.2.0", "1.0.0"]Args
versions: list: The list of available version strings.requirements: list: The list of semver requirement strings that returned versions must satisfy.
Returns
list[str]: All matching versions, sorted from highest to lowest.
sort()
def sort(versions) -> listSorts a list of semantic versions in ascending order.
Invalid versions cause an error.
versions = semver.sort(["1.10.0", "1.2.0", "1.2.10"])
# versions == ["1.2.0", "1.2.10", "1.10.0"]Args
versions: list: The list of version strings to sort.
Returns
list[str]: The sorted list of versions.
resolve()
def resolve(versions, requirements)Resolves the highest version from a list of available versions that satisfies all of the given requirements.
resolved = semver.resolve(
["1.0.0", "1.2.0", "1.2.5", "2.0.0"],
["^1.0", ">=1.2"],
)
# resolved == "1.2.5"Args
versions: list: The list of available version strings to choose from.requirements: list: The list of semver requirement strings that the chosen version must satisfy.
Returns
str | None: The highest matching version, orNoneif no version satisfies the requirements.
is_prerelease()
def is_prerelease(version) -> boolReturns true if the version has a pre-release identifier (e.g., 1.2.3-rc.1).
if semver.is_prerelease("1.2.3-rc.1"):
# ...Args
version: string: The version string to test.
Returns
bool: True if the version is a pre-release, False otherwise.
filter()
def filter(versions, requirements) -> listFilters a list of versions to those that satisfy all of the given requirements.
The returned versions preserve the order they appear in the input.
matching = semver.filter(["1.0.0", "1.2.0", "2.0.0"], ["^1.0"])
# matching == ["1.0.0", "1.2.0"]Args
versions: list: The list of available version strings.requirements: list: The list of semver requirement strings; each version must satisfy all of them.
Returns
list[str]: The subset ofversionsthat satisfy every requirement.
max()
def max(versions) -> stringReturns the maximum version from a list of semantic versions.
latest = semver.max(["1.2.0", "1.10.0", "1.2.10"])
# latest == "1.10.0"Args
versions: list: A non-empty list of version strings.
Returns
str: The greatest version in the list.
matches()
def matches(version, requirement) -> boolReturns true if the given version satisfies the given requirement.
if semver.matches("1.2.5", "^1.2.0"):
# ...Args
version: string: The semantic version string.requirement: string: The semver requirement string.
Returns
bool: True if the version satisfies the requirement, False otherwise.
sh
lines()
def lines(command) -> listRuns a shell command and returns its stdout split into individual lines.
Each line of the command’s standard output becomes one element of the returned list. A trailing empty line (the newline after the last output line) is not included in the result.
The command is executed by the platform shell (/bin/sh -c on
Unix, cmd.exe /C on Windows).
⚠ Shell-Injection Warning
Do not interpolate untrusted input into command. See the module
documentation for details and safe alternatives.
Errors
Returns an error if check=True (the default) and the command exits
with a non-zero status, or if the process cannot be spawned.
Example
files = sh.lines("ls -1")
for f in files:
print(f)
# Safely handle commands that may find nothing
matches = sh.lines("grep -rl 'TODO' src/", check=False)Args
command: string: – Shell command string to execute.check: bool: – IfTrue(the default), return an error when the command exits with a non-zero status.cwd: string: – Optional working directory for the command.
Returns
A list[str] — one element per output line. Returns an empty list when the command produces no output.
exit_code()
def exit_code(command) -> intRuns a shell command and returns only its numeric exit code.
This function never returns an error for a non-zero exit status — it only fails if the process cannot be spawned or waited on. It is useful for conditional logic where the output of the command is not needed.
The command is executed by the platform shell (/bin/sh -c on
Unix, cmd.exe /C on Windows).
⚠ Shell-Injection Warning
Do not interpolate untrusted input into command. See the module
documentation for details and safe alternatives.
Errors
Returns an error only if the process cannot be spawned or waited on.
Example
code = sh.exit_code("test -f config.json")
if code == 0:
print("config exists")Args
command: string: – Shell command string to execute.cwd: string: – Optional working directory for the command.
Returns
The command’s exit code as an int (0 = success, non-zero = failure). Returns 1 if the process terminates without a numeric exit code (e.g. killed by a signal on Unix).
run()
def run(command)Runs a shell command and returns its exit status, stdout, and stderr.
The command is executed by the platform shell:
- Unix:
/bin/sh -c <command> - Windows:
cmd.exe /C <command>
Shell features such as pipes (|), redirections (>, 2>&1),
semicolons, and globs are fully supported.
⚠ Shell-Injection Warning
Do not interpolate untrusted input into command. See the module
documentation for details and safe alternatives.
Errors
Returns an error if check=True and the command exits with a non-zero
status, or if the process cannot be spawned.
Example
result = sh.run("cat *.log | grep ERROR | wc -l", check=True)
print(result["stdout"])
# Capture stderr alongside stdout
result = sh.run("some_tool 2>&1")
print(result["stdout"])Args
command: string: – Shell command string to execute.check: bool: – IfTrue, return an error when the command exits with a non-zero status. Defaults toFalse.cwd: string: – Optional working directory for the command.
Returns
A dict with: - status (int): exit code of the command (0 = success). - stdout (str): captured standard output. - stderr (str): captured standard error.
capture()
def capture(command) -> stringRuns a shell command and returns its stdout as a trimmed string.
Trailing newlines and carriage returns are stripped from the output. This is the most convenient function when you only need the output of a command and want errors to abort immediately.
The command is executed by the platform shell (/bin/sh -c on
Unix, cmd.exe /C on Windows).
⚠ Shell-Injection Warning
Do not interpolate untrusted input into command. See the module
documentation for details and safe alternatives.
Errors
Returns an error if check=True (the default) and the command exits
with a non-zero status, or if the process cannot be spawned.
Example
head = sh.capture("git rev-parse HEAD")
# Suppress errors and fall back to a default
branch = sh.capture("git rev-parse --abbrev-ref HEAD", check=False)
if not branch:
branch = "unknown"Args
command: string: – Shell command string to execute.check: bool: – IfTrue(the default), return an error when the command exits with a non-zero status. Set toFalseto ignore failures and return whatever output was produced.cwd: string: – Optional working directory for the command.
Returns
The command’s stdout with trailing whitespace stripped.
sorted()
def sorted() -> listsorted: sort a sequence
sorted(x) returns a new list containing the elements of the iterable
sequence x, in sorted order. The sort algorithm is stable.
The optional named parameter reverse, if true, causes sorted to
return results in reverse sorted order.
The optional named parameter key specifies a function of one
argument to apply to obtain the value’s sort key.
The default behavior is the identity function.
sorted([3, 1, 4, 1, 5, 9]) == [1, 1, 3, 4, 5, 9]
sorted([3, 1, 4, 1, 5, 9], reverse=True) == [9, 5, 4, 3, 1, 1]
sorted(["two", "three", "four"], key=len) == ["two", "four", "three"] # shortest to longest
sorted(["two", "three", "four"], key=len, reverse=True) == ["three", "four", "two"] # longest to shortestArgs
key:reverse: bool:
string
kebab_case()
def kebab_case(s) -> stringArgs
s: string:
to_upper()
def to_upper(s) -> stringArgs
s: string:
format_table()
def format_table(rows) -> stringArgs
rows: list:
starts_with()
def starts_with(s, prefix) -> boolArgs
s: string:prefix: string:
regex_match()
def regex_match(pattern, s)Args
pattern: string:s: string:
trim_start()
def trim_start(s) -> stringArgs
s: string:
trim_end()
def trim_end(s) -> stringArgs
s: string:
ends_with()
def ends_with(s, suffix) -> boolArgs
s: string:suffix: string:
replace()
def replace(s, from, to, count, regex, ignore_case) -> stringArgs
s: string:from: string:to: string:count: int:regex: bool:ignore_case: bool:
to_lower()
def to_lower(s) -> stringArgs
s: string:
camel_case()
def camel_case(s) -> stringArgs
s: string:
trim()
def trim(s) -> stringArgs
s: string:
regex_captures()
def regex_captures(pattern, s)Args
pattern: string:s: string:
split_lines()
def split_lines(s) -> listArgs
s: string:
regex_find_all()
def regex_find_all(pattern, s)Args
pattern: string:s: string:
split_whitespace()
def split_whitespace(s) -> listArgs
s: string:
snake_case()
def snake_case(s) -> stringArgs
s: string:
pad_left()
def pad_left(s, n, fill) -> stringArgs
s: string:n: int:fill: string:
title_case()
def title_case(s) -> stringArgs
s: string:
contains()
def contains(s, needle, ignore_case) -> boolArgs
s: string:needle: string:ignore_case: bool:
pad_right()
def pad_right(s, n, fill) -> stringArgs
s: string:n: int:fill: string:
sys
arch()
def arch() -> stringReturns the current CPU architecture.
sys.arch() # "x86_64" | "aarch64" | ...hostname()
def hostname() -> stringReturns the hostname for the current machine, if available.
sys.hostname()cpu_count()
def cpu_count() -> intReturns the logical CPU count.
sys.cpu_count()executable()
def executable() -> stringReturns the current executable path.
sys.executable()endianness()
def endianness() -> stringReturns host byte order.
sys.endianness() # "little" | "big"user_home()
def user_home() -> stringReturns the current user’s home directory path.
sys.user_home()is_ci()
def is_ci() -> boolReturns true when running in common CI environments.
sys.is_ci()exit()
def exit(code) -> intExits the program with the specified exit code.
sys.exit(0) # Exit successfully
sys.exit(1) # Exit with error codeArgs
code: int:
total_memory_bytes()
def total_memory_bytes() -> intReturns total system memory in bytes.
sys.total_memory_bytes()os()
def os() -> stringReturns the current operating system name.
sys.os() # "linux" | "macos" | "windows"username()
def username() -> stringReturns the current username, if available.
sys.username()text
scan_lines()
def scan_lines(content, callback) -> listSplit a string into lines and invoke a callback for each line.
This function splits a string by newline characters (handling both \n and \r\n) and
invokes a callback for each line. This is useful for processing text content already in memory.
Example
# Count non-empty lines
content = fs.read_file("data.txt")
non_empty = text.scan_lines(
content,
lambda line, num: 1 if line.strip() else None
)
count = len(non_empty)Args
content: string: - The string to split into linescallback: - A function that takes(line: string, line_number: int)and returns any value. ReturnNoneto exclude the result from the output list.
Returns
A list containing all non-None values returned by the callback function.
head()
def head(path, n) -> listReturn the first n lines of a file.
Reads and returns the first n lines from a file, similar to the Unix head command.
Stops reading after n lines for efficiency.
Example
# Get the first 5 lines of a log file
first_lines = text.head("app.log", 5)Args
path: string: - Path to the filen: int: - Number of lines to read (must be >= 0)
Returns
A list of strings containing the first n lines (newlines are stripped).
match_to_diagnostic()
def match_to_diagnostic(options)Convert a regex match result to a diagnostic.
This function takes a RegexMatchResult (from regex_scan_tagged or similar functions)
and converts it to a DiagnosticResult by extracting named capture groups.
This is more efficient than manually extracting fields in Starlark code.
Named Capture Groups
The function looks for these named groups in the match:
file- File pathline- Line number (1-based)column- Column number (1-based)end_line- End line number (1-based)end_column- End column number (1-based)code- Error/warning codemessage- Diagnostic message
Example
matches = text.regex_scan_tagged(
content,
{
"patterns": [
{"tag": "error", "pattern": r"(?P<file>\S+):(?P<line>\d+): (?P<message>.*)$"}
]
}
)
diags = [
text.match_to_diagnostic({
"match": m,
"severity": "error",
"source": "mycompiler"
})
for m in matches
]Args
options: - A dictionary with the following keys: *match(dict, required): A regex match result fromregex_scan_taggedor similar *severity(string, optional): Severity level (“error”, “warning”, “info”, “hint”, or “note”). Default: “error” *default_message(string, optional): Message to use if no “message” capture group exists. Default: uses the full match string *default_file(string, optional): File to use if no “file” capture group exists. If omitted, file is not rendered. *source(string, optional): Source identifier for the diagnostic (e.g., “eslint”, “rustc”)related: - Optional related diagnostics
Returns
A diagnostic dictionary that can be rendered with render_diagnostics.
render_diagnostics()
def render_diagnostics(diagnostics) -> stringRender a list of diagnostics in various formats.
Converts a list of diagnostic dictionaries into a formatted string suitable for display or consumption by CI/CD tools. Related diagnostics are rendered inline after their parent diagnostic to provide additional context.
Example
diagnostics = []
diagnostics.append(text.diagnostic({
"file": "src/main.py",
"severity": "error",
"message": "Syntax error",
"line": 10,
"column": 5
}))
# For console output
print(text.render_diagnostics(diagnostics, format="human"))
# For GitHub Actions
print(text.render_diagnostics(diagnostics, format="github"))
# For tools that consume SARIF
fs.write_file("results.sarif", text.render_diagnostics(diagnostics, format="sarif"))Args
diagnostics: list: - A list of diagnostic dictionaries (created withdiagnostic())format: string: - Output format (default: “human”): *"human": Human-readable format like “file.py:10:5: error: message”. Related diagnostics are indented with " “. *"github": GitHub Actions workflow commands format (creates annotations). Related diagnostics are emitted as separate commands. *"json": Pretty-printed JSON array (includes all fields including related) *"sarif": SARIF 2.1.0 format (Static Analysis Results Interchange Format)
Returns
A formatted string representation of the diagnostics.
diagnostic()
def diagnostic(options)Create a standardized diagnostic dictionary.
This function creates a properly formatted diagnostic that can be rendered in various formats
(human-readable, GitHub Actions, JSON, SARIF) using render_diagnostics(). Diagnostics are
used to report errors, warnings, and other issues found during linting, building, or testing.
Example
diag = text.diagnostic({
"file": "src/main.py",
"severity": "error",
"message": "Undefined variable 'x'",
"line": 42,
"column": 10,
"code": "E0602",
"source": "pylint"
})Args
options: - A dictionary with the following keys: *file(string, required): Path to the file where the issue was found *severity(string, required): One of: “error”, “warning”, “info”, “hint”, “note” *message(string, required): Description of the issue *line(int, optional): Line number where the issue occurs (1-based, must be >= 1) *column(int, optional): Column number where the issue starts (1-based, must be >= 1) *end_line(int, optional): Line number where the issue ends (1-based, must be >= 1) *end_column(int, optional): Column number where the issue ends (1-based, must be >= 1) *code(string, optional): Error code or rule identifier (e.g., “E501”, “no-unused-vars”) *source(string, optional): Name of the tool that generated this diagnostic (e.g., “pylint”, “eslint”)related: - Optional list of related diagnostics (for additional context)
Returns
A dictionary representing the diagnostic with all provided fields.
grep()
def grep(options) -> listSearch for lines in a file matching a regex pattern.
Searches through a file line-by-line and returns information about lines matching (or not matching)
a regular expression pattern. Similar to the Unix grep command but returns structured data.
Example
# Find all error lines with a severity level
matches = text.grep({
"path": "app.log",
"pattern": r"ERROR \[(?P<severity>\w+)\]",
"max": 100
})
for m in matches:
print("Line {}: severity={}".format(m["line"], m["named"]["severity"]))Args
options: - A dictionary with the following keys: *path(string, required): Path to the file to search *pattern(string, required): Regular expression pattern to match *ignore_case(bool, optional): If true, perform case-insensitive matching (default: false) *invert(bool, optional): If true, return lines that don’t match (default: false) *max(int, optional): Maximum number of matches to return (default: unlimited)
Returns
A list of dictionaries, one per matching line, with the following keys: * line (int): Line number (1-based) * text (string): The full text of the matching line * match (string): The portion of the line that matched the pattern * named (dict): Dictionary of named capture groups from the regex
regex_scan()
def regex_scan(content, patterns) -> listScan content for multiple regex patterns simultaneously.
This function efficiently searches through content for multiple regex patterns at once, returning detailed information about all matches found. It uses a RegexSet internally for efficient multi-pattern matching.
Example
log_content = fs.read_file("app.log")
matches = text.regex_scan(log_content, [
r"ERROR: (?P<msg>.*)",
r"WARN: (?P<msg>.*)",
r"FATAL: (?P<msg>.*)"
])
for m in matches:
print("Pattern {}: {}".format(m["pattern_index"], m["named"]["msg"]))Args
content: string: - The string to searchpatterns: list: - A list of regex pattern strings to search for
Returns
A list of dictionaries, one per match, with the following keys: * pattern_index (int): Index of the pattern that matched (0-based) * line (int): Line number where the match occurred (1-based) * column (int): Column number where the match starts (1-based, character offset) * match (string): The text that matched * named (dict): Dictionary of named capture groups from the regex
scan_file()
def scan_file(options, callback) -> listStream a file line-by-line and invoke a callback for each line.
This function reads a file from disk line-by-line without loading the entire file into memory, making it efficient for processing large files. For each line, it invokes a callback function and collects non-None results.
Example
# Find lines containing "error"
errors = text.scan_file(
{"path": "app.log"},
lambda line, num: {"line": num, "text": line} if "error" in line.lower() else None
)Args
options: - A dictionary with the following keys: *path(string, required): Path to the file to scan *encoding(string, optional): Encoding to use. Either “utf-8” (default, strict) or “lossy” (replaces invalid UTF-8) *strip_newline(bool, optional): Whether to strip newline characters from each line (default: true)callback: - A function that takes(line: string, line_number: int)and returns any value. ReturnNoneto exclude the result from the output list.
Returns
A list containing all non-None values returned by the callback function.
regex_scan_tagged()
def regex_scan_tagged(content, options) -> listScan content for multiple tagged regex patterns.
Similar to regex_scan() but allows associating a custom tag with each pattern.
This makes it easier to identify which type of pattern matched without tracking indices.
Example
log_content = fs.read_file("app.log")
matches = text.regex_scan_tagged(log_content, {
"patterns": [
{"tag": "error", "pattern": r"ERROR: (?P<msg>.*)"},
{"tag": "warning", "pattern": r"WARN: (?P<msg>.*)"},
{"tag": "fatal", "pattern": r"FATAL: (?P<msg>.*)"}
],
"first_match_only": True
})
for m in matches:
print("{}: {}".format(m["tag"], m["named"]["msg"]))Args
content: string: - The string to searchoptions: - A dictionary with: *patterns(list): A list of dictionaries, each with: *tag(string): A custom identifier for this pattern *pattern(string): The regex pattern to match *first_match_only(bool, optional): If true, match each line to at most one pattern where the first pattern to match wins. Defaults to false.
Returns
A list of dictionaries, one per match, with the following keys: * tag (string): The tag associated with the matched pattern * line (int): Line number where the match occurred (1-based) * column (int): Column number where the match starts (1-based, character offset) * match (string): The text that matched * named (dict): Dictionary of named capture groups from the regex
tail()
def tail(path, n) -> listReturn the last n lines of a file.
Reads and returns the last n lines from a file, similar to the Unix tail command.
The entire file is scanned, but only the last n lines are kept in memory.
Example
# Get the last 10 lines of a log file
last_lines = text.tail("app.log", 10)Args
path: string: - Path to the filen: int: - Number of lines to read (must be >= 0)
Returns
A list of strings containing the last n lines (newlines are stripped).
dedup_diagnostics()
def dedup_diagnostics(diagnostics) -> listRemove duplicate diagnostics from a list.
Compares diagnostics based on their complete JSON representation and removes duplicates, preserving only the first occurrence of each unique diagnostic. This is useful when combining diagnostics from multiple sources that may report the same issue.
Example
all_diags = pylint_diags + mypy_diags + flake8_diags
unique_diags = text.dedup_diagnostics(all_diags)Args
diagnostics: list: - A list of diagnostic dictionaries
Returns
A new list containing only unique diagnostics, in the order they first appeared.
read_line_range()
def read_line_range(path, start, end) -> listRead a range of lines from a file.
Reads lines from start to end (inclusive, 1-based indexing). This is efficient for
extracting a specific section of a file without reading the entire file into memory.
Example
# Read lines 10-20 from a file
lines = text.read_line_range("data.txt", 10, 20)
for line in lines:
print(line)Args
path: string: - Path to the filestart: int: - First line to read (1-based, must be >= 1)end: int: - Last line to read (1-based, inclusive, must be >= start)
Returns
A list of strings, one per line (newlines are stripped).
scan_windows_file()
def scan_windows_file(path, n, callback) -> listSlide a window of consecutive lines over a file.
Similar to scan_windows() but reads from a file on disk, processing it in a streaming fashion
without loading the entire file into memory. This is efficient for large files.
Example
# Find error blocks that span 5 lines
errors = text.scan_windows_file(
"large.log",
5,
lambda window, line: {"start": line, "text": window} if "ERROR" in window[0] else None
)Args
path: string: - Path to the file to scann: int: - Window size in lines (must be >= 1)callback: - A function that takes(window: list[string], start_line: int)and returns any value. The window contains up tonlines, andstart_lineis the 1-based line number of the first line. ReturnNoneto exclude the result from the output list.
Returns
A list containing all non-None values returned by the callback function.
regex_scan_file()
def regex_scan_file(path, patterns) -> listScan a file for multiple regex patterns simultaneously.
Similar to regex_scan() but reads from a file on disk in a streaming fashion,
making it efficient for large files.
Example
matches = text.regex_scan_file("large.log", [
r"ERROR: (?P<msg>.*)",
r"FATAL: (?P<msg>.*)"
])Args
path: string: - Path to the file to scanpatterns: list: - A list of regex pattern strings to search for
Returns
A list of dictionaries, one per match, with the following keys: * pattern_index (int): Index of the pattern that matched (0-based) * line (int): Line number where the match occurred (1-based) * column (int): Column number where the match starts (1-based, character offset) * match (string): The text that matched * named (dict): Dictionary of named capture groups from the regex
dedent()
def dedent(content) -> stringRemove common leading whitespace from all non-empty lines.
This function analyzes all non-empty lines to find the minimum indentation level,
then removes that amount of leading whitespace from each line. This is useful for
processing indented text blocks. Equivalent to Python’s textwrap.dedent().
Example
code = '''
def hello():
print("world")
'''
dedented = text.dedent(code)
# Result:
# def hello():
# print("world")Args
content: string: - The string to dedent
Returns
A new string with common leading whitespace removed.
scan_windows()
def scan_windows(content, n, callback) -> listSlide a window of consecutive lines over a string.
This function processes a string by sliding a window of n consecutive lines over it,
invoking a callback for each window position. This is useful for analyzing patterns that
span multiple lines (e.g., function definitions, error blocks).
Example
# Find function definitions (assume they span 3 lines)
content = fs.read_file("code.py")
functions = text.scan_windows(
content,
3,
lambda window, line: line if "def " in window[0] else None
)Args
content: string: - The string to processn: int: - Window size in lines (must be >= 1)callback: - A function that takes(window: list[string], start_line: int)and returns any value. The window contains up tonlines, andstart_lineis the 1-based line number of the first line. ReturnNoneto exclude the result from the output list.
Returns
A list containing all non-None values returned by the callback function.
regex_scan_tagged_file()
def regex_scan_tagged_file(path, options) -> listScan a file for multiple tagged regex patterns.
Similar to regex_scan_tagged() but reads from a file on disk in a streaming fashion,
making it efficient for large files.
Example
matches = text.regex_scan_tagged_file("large.log", {
"patterns": [
{"tag": "error", "pattern": r"ERROR: (?P<msg>.*)"},
{"tag": "fatal", "pattern": r"FATAL: (?P<msg>.*)"}
],
"first_match_only": True
})Args
path: string: - Path to the file to scanoptions: - A dictionary with: *patterns(list): A list of dictionaries, each with: *tag(string): A custom identifier for this pattern *pattern(string): The regex pattern to match *first_match_only(bool, optional): If true, match each line to at most one pattern where the first pattern to match wins. Defaults to false.
Returns
A list of dictionaries, one per match, with the following keys: * tag (string): The tag associated with the matched pattern * line (int): Line number where the match occurred (1-based) * column (int): Column number where the match starts (1-based, character offset) * match (string): The text that matched * named (dict): Dictionary of named capture groups from the regex
line_count()
def line_count(path) -> intReturn the number of lines in a file.
Counts the number of lines by counting newline characters. If the file is non-empty and doesn’t end with a newline, the last line is still counted. This function reads the file in chunks for efficiency.
Example
count = text.line_count("large_file.txt")
print("File has {} lines".format(count))Args
path: string: - Path to the file
Returns
The number of lines in the file as an integer.
time
timer_drop()
def timer_drop(timer_id) -> NoneTypeRemoves a timer handle from the registry.
Args
timer_id: int:
timer_reset()
def timer_reset(timer_id) -> NoneTypeResets a timer handle to start counting from now.
Args
timer_id: int:
unix()
def unix() -> intReturns current unix timestamp in seconds.
format()
def format(secs, fmt) -> stringFormats unix timestamp (seconds) using strftime format.
Args
secs: int:fmt: string:
monotonic_ms()
def monotonic_ms() -> intReturns process-local monotonic milliseconds for duration measurement.
sleep_seconds()
def sleep_seconds(seconds) -> NoneTypePauses execution for the specified number of whole seconds.
Note: uses integer seconds to match supported Starlark argument types.
Args
seconds: int:
unix_ms()
def unix_ms() -> intReturns current unix timestamp in milliseconds.
parse()
def parse(s, fmt) -> intParses a datetime string using strftime format and returns unix seconds.
If timezone is omitted, UTC is assumed.
Args
s: string:fmt: string:
iso8601()
def iso8601() -> stringReturns current UTC time as ISO8601 / RFC3339 string.
now()
def now() -> tupleReturns the current system time as a tuple.
(secs, nsec) = time.now()timer()
def timer() -> intCreates a timer handle and returns its integer id.
Example: h = time.timer()
… work …
ms = time.timer_elapsed_ms(h)
timer_elapsed_ns()
def timer_elapsed_ns(timer_id) -> intReturns elapsed nanoseconds for a timer handle.
Args
timer_id: int:
sleep()
def sleep(nanoseconds) -> NoneTypePauses execution for the specified number of nanoseconds.
Args
nanoseconds: int:
sleep_ms()
def sleep_ms(milliseconds) -> NoneTypePauses execution for the specified number of milliseconds.
Args
milliseconds: int:
timer_elapsed_ms()
def timer_elapsed_ms(timer_id) -> intReturns elapsed milliseconds for a timer handle.
Args
timer_id: int:
tmp
cleanup_all()
def cleanup_all() -> NoneTypeClean up all tracked temporary resources that are not marked keep=true.
Resources created with tmp.dir_keep are skipped. All other tracked
resources are removed from the registry and deleted from disk before
this function returns.
Unlike a fail-fast approach, cleanup continues for every entry even if an individual deletion fails; all errors are then reported together so that no entry is silently leaked.
Example: tmp.cleanup_all()
dir_keep()
def dir_keep() -> stringCreate a temporary directory that will NOT be automatically cleaned up.
Identical to tmp.dir except that tmp.cleanup_all() will leave this
directory in place. Useful for caches or artefacts that must survive
the script.
Returns the created directory path as a string.
Example: d = tmp.dir_keep(prefix = “cache-”)
Args
prefix: string:
file()
def file() -> stringCreate a temporary file and register it for later cleanup.
The file is created in the system’s default temp location with a randomly generated name. On Unix the file is created with mode 0600 (owner-only access). The file starts empty.
Returns the created file path as a string.
Example: f = tmp.file(suffix = “.log”)
Args
suffix: string:
dir()
def dir() -> stringCreate a temporary directory and register it for later cleanup.
The directory is created in the system’s default temp location using cryptographically random bytes for the unique part of its name, so collisions are not possible in practice. On Unix the directory is created with mode 0700 (owner-only access).
Returns the created directory path as a string.
Example: d = tmp.dir(prefix = “build-”)
Args
prefix: string:
cleanup()
def cleanup(path) -> NoneTypeImmediately clean up a single tracked temporary resource.
Pass the path string that was returned by tmp.dir, tmp.dir_keep,
or tmp.file. Raises an error if the path is not tracked.
Example: f = tmp.file(suffix = “.txt”)
… use f …
tmp.cleanup(f)
Args
path: string:
toml
is_string_toml()
def is_string_toml(value) -> boolReturns True if the given string is valid TOML, False otherwise.
if toml.is_string_toml('key = "value"'):
print("Valid TOML")Args
value: string: The string to check.
Returns
bool:Trueif the string parses as valid TOML,Falseotherwise.
try_string_to_dict()
def try_string_to_dict(content)Attempts to convert a TOML formatted string into a Starlark dictionary/value. Returns the default value (defaulting to None) if parsing fails instead of propagating an error.
On success, returns the parsed Starlark value. On failure, returns the
value supplied to default (which itself defaults to None).
A named default parameter lets callers supply a sentinel distinguishable
from a successful but empty parse result:
MISSING = "PARSE_FAILED"
result = toml.try_string_to_dict(raw, default = MISSING)
if result == MISSING:
print("input was not valid TOML")
else:
print(result["key"])When called without default the behaviour is: None is returned on
parse failure.
raw_data = 'id = 101\nstatus = "active"'
result = toml.try_string_to_dict(raw_data)
if result != None:
print(result["status"])
else:
print("Failed to parse TOML")Notes
- TOML datetime values are returned as ISO 8601 strings.
- TOML special floats (
inf,-inf,nan) cause an error even in this try variant because the failure occurs post-parse during conversion.
Args
content: string: The TOML-formatted string to be converted.default: (named, optional): Value to return when parsing fails. Defaults toNone.
Returns
dict | <default>: A Starlark value representing the parsed TOML, or thedefaultvalue if parsing fails.
to_string()
def to_string(value) -> stringConverts a dictionary or Starlark value into a TOML-formatted string.
data = {"name": "Project Alpha", "version": 1}
toml_string = toml.to_string(data)Args
value: The dictionary or Starlark value to be serialized.
Returns
str: The TOML string representation of the input value.
string_to_dict()
def string_to_dict(content)Converts a TOML formatted string into a Starlark dictionary/value.
raw_data = 'id = 101\nstatus = "active"'
data_dict = toml.string_to_dict(raw_data)
print(data_dict["status"])Notes
- TOML datetime values (
1979-05-27T07:32:00Z,1979-05-27,07:32:00) are returned as ISO 8601 strings because Starlark has no datetime type. - TOML special float literals (
inf,-inf,nan) are not supported and will cause an error.
Args
content: string: The TOML-formatted string to be converted.
Returns
dict: A dictionary representation of the TOML data.
to_string_pretty()
def to_string_pretty(value) -> stringConverts a dictionary or Starlark value into a pretty-printed TOML string.
data = {"project": "Gemini", "active": True, "tags": ["ai", "helper"]}
pretty_toml = toml.to_string_pretty(data)
print(pretty_toml)Args
value: The dictionary or Starlark value to be serialized.
Returns
str: The formatted TOML string.
workspace
set_default_module_visibility_private()
def set_default_module_visibility_private() -> NoneTypeSets the default visibility to private for the current module.
workspace.set_default_module_visibility_private()set_env()
def set_env() -> NoneTypeSets the workspace environment.
This is meant for internal use only from the env.spaces.star module.
workspace.set_env(
env = {
"vars": {"CC": "clang", "CXX": "clang++"},
},
)Args
env: Environment definition containingvars(dict),paths(list), andinherited(list).
set_locks()
def set_locks() -> NoneTypeSets the workspace locks.
This is meant for internal use only from a lock module.
workspace.set_locks(
locks = {"my_lock": "lock_value"},
)Args
locks: A dictionary of lock names to lock values.
get_env_var()
def get_env_var(var_name) -> stringReturns the value of a workspace environment variable.
home_dir = workspace.get_env_var("HOME")Args
var_name: string: The name of the environment variable.
Returns
str: The value of the environment variable.
get_path_to_home()
def get_path_to_home() -> stringReturns the relative workspace path to the workspce HOME directory.
Returns an error if no active workspace is found in the evaluator context.
get_path_to_log_file()
def get_path_to_log_file(rule) -> stringReturns the relative workspace path to the log file for the specified target.
log_file = workspace.get_path_to_log_file("build_service")Args
rule: string: The name of the target rule.
Returns
str: The relative path to the log file within the workspace.
is_env_var_set()
def is_env_var_set(var_name) -> boolReturns true if the workspace environment variable is set.
if workspace.is_env_var_set("DEBUG_MODE"):
# ...Args
var_name: string: The name of the environment variable to check.
Returns
bool: True if the variable exists in the workspace environment, False otherwise.
get_path_to_build_archive()
def get_path_to_build_archive() -> stringReturns the path to where run.add_archive() creates the output archive.
archive_info = {
"input": "dist",
"name": "release_pkg",
"version": "2.1.0",
"driver": "zip",
}
path = workspace.get_path_to_build_archive(rule_name = "package_rule", archive = archive_info)Args
rule_name: string: The name of the rule used to create the archive.archive: The archive info used to create the archive.
Returns
str: The path to the generated output archive.
get_absolute_path()
def get_absolute_path() -> stringReturns the absolute path to the workspace.
workspace_path = workspace.get_absolute_path()Returns
str: The absolute path to the workspace.
is_path_to_member_available()
def is_path_to_member_available() -> boolReturns true if the workspace satisfies the specified member requirements.
member_req = {
"url": "https://github.com/example/repo.git",
"required": {"Revision": "a1b2c3d4e5f6"}
}
if workspace.is_path_to_member_available(member = member_req):
# ...Args
member: The requirements for the member, containingurl(str) andrequired(dict).
Returns
bool: True if the workspace contains a member matching the requirements, False otherwise.
is_env_var_set_to()
def is_env_var_set_to(var_name, var_value) -> boolReturns true if the workspace environment variable is set.
if workspace.is_env_var_set_to("DEBUG_MODE", "ON"):
# ...Args
var_name: string: The name of the environment variable to check.var_value: string: The expected value of the environment variable.
Returns
bool: True if the variable exists and is equal to the expected value, False otherwise.
load_values()
def load_values(key)Loads all values matching a key from the checkout store.
This returns values for key from the underlying checkout store that
workspace.load_value() reads from, across all URLs and paths.
values = workspace.load_values("my_key")
for item in values:
print(item["url"], item["path"], item["value"])Args
key: string: The string key to look up in all checkout store entries.
Returns
list[dict]: A list of dictionaries withurl(str),path(str), andvalue(JSON-compatible value).
get_path_to_shell_config()
def get_path_to_shell_config() -> stringReturns the path to the shell config file.
shell_config = workspace.get_path_to_shell_config()Returns
str: The path to the shell configuration file used by the workspace.
set_always_evaluate()
def set_always_evaluate(always_evaluate) -> NoneTypeSets whether the workspace should always evaluate scripts.
workspace.set_always_evaluate(True)Args
always_evaluate: bool: If True, scripts will always be evaluated regardless of caching.
is_reproducible()
def is_reproducible() -> boolReturns true if the workspace is reproducible.
This function is deprecated and always returns False.
if workspace.is_reproducible():
# ...Returns
bool: False (deprecated functionality).
get_path_to_checkout()
def get_path_to_checkout() -> stringReturns the repository path in the workspace of the calling script.
script_location = workspace.get_path_to_checkout()Returns
str: The path to the directory containing the current script.
load_value()
def load_value(key)Loads a value from the checkout store.
Values can be set in three ways, listed from highest to lowest priority:
- Command line:
--store=KEY=VALUE(oncheckout,checkout-repo,co,sync, orrun). These are stored with path//and url<command line>, and always take precedence over all other sources regardless of theurlorpathargument. co.spaces.toml: Thestoretable in a[entry.Repo]or[entry.Workflow]section. TOML values preserve their types (strings, integers, bools, arrays, tables).checkout.store_value(): Called from starlark scripts during checkout or sync.
Returns the stored value associated with the given key, or None if the key
does not exist. Values are namespaced by the value written by
checkout.store_value(..., path = ...), or by member path when no
path is provided.
Exactly one of url or path may be specified. If neither is given, all
members are searched and the first match is returned. Note that command-line
values are checked first and returned immediately if the key matches,
before consulting url or path.
# Load from a specific member URL (command-line values still take priority)
value = workspace.load_value("my_key", url = "https://github.com/example/repo")
# Load from a specific path in the workspace
value = workspace.load_value("my_key", path = "spaces")
# Search all members for the key, returning the first match
value = workspace.load_value("my_key")Args
key: string: The string key to look up.url: Optional member URL to load from (ignored when a command-line value exists for the key).path: Optional namespace or workspace path to identify the store entry. For member paths, the member with the longest matching path prefix is used (ignored when a command-line value exists for the key).
Returns
- The stored JSON value (string, number, bool, list, dict), or
Noneif the key is not found.
get_short_digest()
def get_short_digest() -> stringReturns the short digest of the workspace.
short_digest = workspace.get_short_digest()Returns
str: The short digest string of the workspace.
get_path_to_build_checkout()
def get_path_to_build_checkout() -> stringReturns the path to the workspace build folder for the current script.
build_path = workspace.get_path_to_build_checkout(rule_name = "my_rule")Args
rule_name: string: The name of the rule to get the build checkout path for.
Returns
str: The path to the build directory associated with the current script evaluation.
get_digest()
def get_digest() -> stringReturns the digest of the workspace.
This is only meaningful if the workspace is reproducible, which is typically determined after the checkout process is complete.
digest = workspace.get_digest()Returns
str: The unique digest string of the workspace.
get_path_to_member()
def get_path_to_member() -> stringReturns the path to the workspace member matching the specified requirement.
If the member cannot be found, an error is raised.
member_req = {
"url": "https://github.com/example/repo.git",
"required": {"SemVer": "^1.2.0"}
}
path = workspace.get_path_to_member(member = member_req)Args
member: The requirements for the member, containingurl(str) andrequired(dict).
Returns
str: The workspace path to the matching member.
get_build_archive_info()
def get_build_archive_info()Returns the archive and sha256 file paths for a build archive.
archive_info = {
"input": "build/install",
"name": "my_archive",
"version": "1.0",
"driver": "tar.gz",
}
info = workspace.get_build_archive_info(rule_name = "my_rule", archive = archive_info)Args
rule_name: string: The name of the rule used to create the archive.archive: The archive info used to create the archive.
Returns
dict: A dictionary containingarchive_path(str) andsha256_path(str).
yaml
is_string_yaml()
def is_string_yaml(value) -> boolReturns True if the given string is valid YAML (single-document), False otherwise.
Only the first document is validated. The function never raises an error; it always returns a boolean.
if yaml.is_string_yaml("key: value"):
print("Valid YAML")Args
value: string: The string to check.
Returns
bool:Trueif the string parses as valid YAML,Falseotherwise.
try_string_to_dict()
def try_string_to_dict(content)Attempts to convert a YAML formatted string into a Starlark dictionary/value.
On success, returns the parsed Starlark value. On failure, returns the
value supplied to default (which itself defaults to None).
A named default parameter lets callers supply a sentinel that is
distinguishable from a successfully decoded YAML null:
MISSING = "PARSE_FAILED"
result = yaml.try_string_to_dict(raw, default = MISSING)
if result == MISSING:
print("input was not valid YAML")
elif result == None:
print("input was the YAML null literal")
else:
print(result["key"])When called without default the behaviour is: None is returned on
parse failure.
raw_data = "id: 101\nstatus: active\n"
result = yaml.try_string_to_dict(raw_data)
if result != None:
print(result["status"])
else:
print("Failed to parse YAML")Notes
- Multi-document input (multiple
----separated documents) causes a parse error;defaultis returned in that case. inf,-inf, and.nanfloats cause a post-parse conversion error even in this try variant.
Args
content: string: The YAML-formatted string to be converted.default: (named, optional): Value to return when parsing fails. Defaults toNone.
Returns
dict | <default>: A Starlark value representing the parsed YAML, or thedefaultvalue if parsing fails.
to_string()
def to_string(value) -> stringConverts a dictionary or Starlark value into a YAML-formatted string.
This is the inverse of string_to_dict. It takes structured data
and serializes it into a string, making it ready to be written
to a file or sent over a network.
Round-trip notes: The serialized YAML will not contain comments, anchors/aliases, or the original quoting style. Key ordering follows the Starlark dict iteration order (insertion order), which may differ from the original source if the value was parsed from YAML.
data = {"name": "Project Alpha", "version": 1}
yaml_string = yaml.to_string(data)Args
value: The dictionary or Starlark value to be serialized.
Returns
str: The YAML string representation of the input value.
string_to_dict()
def string_to_dict(content)Converts a YAML formatted string into a Starlark dictionary/value.
The YAML is parsed with serde_yaml, which:
- Multi-document input is not supported — if the input contains
multiple
----separated documents,serde_yaml0.9 returns a parse error. Split multi-document streams into individual document strings before calling this function. A single document that begins with a leading---marker is accepted. - Resolves anchors and aliases transparently —
&anchor/*aliassyntax is expanded during parsing. Circular references are rejected with an error. - The YAML 1.1 merge key
<<:is NOT supported —serde_yaml0.9 targets YAML 1.2, which does not include merge keys.<<is treated as a plain string key. Perform merges manually withyaml_merge(). - Does not evaluate arbitrary tags — unknown YAML tags (e.g.
!!python/object) are rejected; the loader is safe. - Round-trip lossiness — comments, key insertion order, and quoting
style are not preserved.
inf,-inf, and.nanfloat literals are valid YAML but have no JSON/Starlark representation and will produce an error.
raw_data = "id: 101\nstatus: active\n"
data_dict = yaml.string_to_dict(raw_data)
print(data_dict["status"])Args
content: string: The YAML-formatted string to be converted.
Returns
dict: A dictionary representation of the YAML data.
zip()
def zip() -> listzip: zip several iterables together
zip() returns a new list of n-tuples formed from corresponding
elements of each of the n iterable sequences provided as arguments to
zip. That is, the first tuple contains the first element of each of
the sequences, the second element contains the second element of each
of the sequences, and so on. The result list is only as long as the
shortest of the input sequences.
zip() == []
zip(range(5)) == [(0,), (1,), (2,), (3,), (4,)]
zip(range(5), "abc".elems()) == [(0, "a"), (1, "b"), (2, "c")]