You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

138 lines
4.2 KiB

"""Functions that help us generate and use info.json files.
"""
import json
import hjson
import jsonschema
from collections.abc import Mapping
from functools import lru_cache
from typing import OrderedDict
from pathlib import Path
from milc import cli
def _dict_raise_on_duplicates(ordered_pairs):
"""Reject duplicate keys."""
d = {}
for k, v in ordered_pairs:
if k in d:
raise ValueError("duplicate key: %r" % (k,))
else:
d[k] = v
return d
def json_load(json_file, strict=True):
"""Load a json file from disk.
Note: file must be a Path object.
"""
try:
# Get the IO Stream for Path objects
# Not necessary if the data is provided via stdin
if isinstance(json_file, Path):
json_file = json_file.open(encoding='utf-8')
return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
exit(1)
except Exception as e:
cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
exit(1)
@lru_cache(maxsize=0)
def load_jsonschema(schema_name):
"""Read a jsonschema file from disk.
"""
if Path(schema_name).exists():
return json_load(schema_name)
schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
if not schema_path.exists():
schema_path = Path('data/schemas/false.jsonschema')
return json_load(schema_path)
@lru_cache(maxsize=0)
def compile_schema_store():
"""Compile all our schemas into a schema store.
"""
schema_store = {}
for schema_file in Path('data/schemas').glob('*.jsonschema'):
schema_data = load_jsonschema(schema_file)
if not isinstance(schema_data, dict):
cli.log.debug('Skipping schema file %s', schema_file)
continue
schema_store[schema_data['$id']] = schema_data
return schema_store
@lru_cache(maxsize=0)
def create_validator(schema):
"""Creates a validator for the given schema id.
"""
schema_store = compile_schema_store()
resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
def validate(data, schema):
"""Validates data against a schema.
"""
validator = create_validator(schema)
return validator(data)
def deep_update(origdict, newdict):
"""Update a dictionary in place, recursing to do a depth-first deep copy.
"""
for key, value in newdict.items():
if isinstance(value, Mapping):
origdict[key] = deep_update(origdict.get(key, {}), value)
else:
origdict[key] = value
return origdict
def merge_ordered_dicts(dicts):
"""Merges nested OrderedDict objects resulting from reading a hjson file.
Later input dicts overrides earlier dicts for plain values.
If any value is "!delete!", the existing value will be removed from its parent.
Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
"""
result = OrderedDict()
def add_entry(target, k, v):
if k in target and isinstance(v, (OrderedDict, dict)):
if "!reset!" in v:
target[k] = v
else:
target[k] = merge_ordered_dicts([target[k], v])
if "!reset!" in target[k]:
del target[k]["!reset!"]
elif k in target and isinstance(v, list):
if v[0] == '!reset!':
target[k] = v[1:]
else:
target[k] = target[k] + v
elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
del target[k]
else:
target[k] = v
for d in dicts:
for (k, v) in d.items():
add_entry(result, k, v)
return result