147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
|
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
|
|
|
|
"""
|
|
Functions to manipulate packed binary representations of number sets.
|
|
|
|
To save space, coverage stores sets of line numbers in SQLite using a packed
|
|
binary representation called a numbits. A numbits is a set of positive
|
|
integers.
|
|
|
|
A numbits is stored as a blob in the database. The exact meaning of the bytes
|
|
in the blobs should be considered an implementation detail that might change in
|
|
the future. Use these functions to work with those binary blobs of data.
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
|
|
from itertools import zip_longest
|
|
from typing import Iterable, List
|
|
|
|
|
|
def nums_to_numbits(nums: Iterable[int]) -> bytes:
|
|
"""Convert `nums` into a numbits.
|
|
|
|
Arguments:
|
|
nums: a reusable iterable of integers, the line numbers to store.
|
|
|
|
Returns:
|
|
A binary blob.
|
|
"""
|
|
try:
|
|
nbytes = max(nums) // 8 + 1
|
|
except ValueError:
|
|
# nums was empty.
|
|
return b""
|
|
b = bytearray(nbytes)
|
|
for num in nums:
|
|
b[num//8] |= 1 << num % 8
|
|
return bytes(b)
|
|
|
|
|
|
def numbits_to_nums(numbits: bytes) -> List[int]:
|
|
"""Convert a numbits into a list of numbers.
|
|
|
|
Arguments:
|
|
numbits: a binary blob, the packed number set.
|
|
|
|
Returns:
|
|
A list of ints.
|
|
|
|
When registered as a SQLite function by :func:`register_sqlite_functions`,
|
|
this returns a string, a JSON-encoded list of ints.
|
|
|
|
"""
|
|
nums = []
|
|
for byte_i, byte in enumerate(numbits):
|
|
for bit_i in range(8):
|
|
if (byte & (1 << bit_i)):
|
|
nums.append(byte_i * 8 + bit_i)
|
|
return nums
|
|
|
|
|
|
def numbits_union(numbits1: bytes, numbits2: bytes) -> bytes:
|
|
"""Compute the union of two numbits.
|
|
|
|
Returns:
|
|
A new numbits, the union of `numbits1` and `numbits2`.
|
|
"""
|
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
|
return bytes(b1 | b2 for b1, b2 in byte_pairs)
|
|
|
|
|
|
def numbits_intersection(numbits1: bytes, numbits2: bytes) -> bytes:
|
|
"""Compute the intersection of two numbits.
|
|
|
|
Returns:
|
|
A new numbits, the intersection `numbits1` and `numbits2`.
|
|
"""
|
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
|
intersection_bytes = bytes(b1 & b2 for b1, b2 in byte_pairs)
|
|
return intersection_bytes.rstrip(b"\0")
|
|
|
|
|
|
def numbits_any_intersection(numbits1: bytes, numbits2: bytes) -> bool:
|
|
"""Is there any number that appears in both numbits?
|
|
|
|
Determine whether two number sets have a non-empty intersection. This is
|
|
faster than computing the intersection.
|
|
|
|
Returns:
|
|
A bool, True if there is any number in both `numbits1` and `numbits2`.
|
|
"""
|
|
byte_pairs = zip_longest(numbits1, numbits2, fillvalue=0)
|
|
return any(b1 & b2 for b1, b2 in byte_pairs)
|
|
|
|
|
|
def num_in_numbits(num: int, numbits: bytes) -> bool:
|
|
"""Does the integer `num` appear in `numbits`?
|
|
|
|
Returns:
|
|
A bool, True if `num` is a member of `numbits`.
|
|
"""
|
|
nbyte, nbit = divmod(num, 8)
|
|
if nbyte >= len(numbits):
|
|
return False
|
|
return bool(numbits[nbyte] & (1 << nbit))
|
|
|
|
|
|
def register_sqlite_functions(connection: sqlite3.Connection) -> None:
|
|
"""
|
|
Define numbits functions in a SQLite connection.
|
|
|
|
This defines these functions for use in SQLite statements:
|
|
|
|
* :func:`numbits_union`
|
|
* :func:`numbits_intersection`
|
|
* :func:`numbits_any_intersection`
|
|
* :func:`num_in_numbits`
|
|
* :func:`numbits_to_nums`
|
|
|
|
`connection` is a :class:`sqlite3.Connection <python:sqlite3.Connection>`
|
|
object. After creating the connection, pass it to this function to
|
|
register the numbits functions. Then you can use numbits functions in your
|
|
queries::
|
|
|
|
import sqlite3
|
|
from coverage.numbits import register_sqlite_functions
|
|
|
|
conn = sqlite3.connect("example.db")
|
|
register_sqlite_functions(conn)
|
|
c = conn.cursor()
|
|
# Kind of a nonsense query:
|
|
# Find all the files and contexts that executed line 47 in any file:
|
|
c.execute(
|
|
"select file_id, context_id from line_bits where num_in_numbits(?, numbits)",
|
|
(47,)
|
|
)
|
|
"""
|
|
connection.create_function("numbits_union", 2, numbits_union)
|
|
connection.create_function("numbits_intersection", 2, numbits_intersection)
|
|
connection.create_function("numbits_any_intersection", 2, numbits_any_intersection)
|
|
connection.create_function("num_in_numbits", 2, num_in_numbits)
|
|
connection.create_function("numbits_to_nums", 1, lambda b: json.dumps(numbits_to_nums(b)))
|