PKCS#11 support
References:
- https://docs.aws.amazon.com/cloudhsm/latest/userguide/pkcs11-v3-samples.html
- https://github.com/danni/python-pkcs11?tab=readme-ov-file#tested-compatibility
- https://github.com/ThalesGroup/pycryptoki
Is there any progress on this? Special signer? Key object classes, that support PKCS11 Keys and certs?
There is no progress yet, and I expect this to be a complex change. PRs and support donations are welcome.
maybe not the most elegant solution but this worked for me using the PyKCS11 library
import PyKCS11
from lxml import etree
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from signxml.xades import XAdESSigner, XAdESVerifier, XAdESVerifyResult, XAdESSignaturePolicy, XAdESDataObjectFormat
import binascii
class PublicNumbers:
def __init__(self, n, e):
self.n = n
self.e = e
class PublicKey:
def __init__(self, n, e):
self.pubnumbers = PublicNumbers(n, e)
def public_numbers(self):
return self.pubnumbers
class Key:
def __init__(self, session, keyid):
self.session = session
self.keyid = keyid
pubkey = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY), (PyKCS11.CKA_ID, self.keyid)])[0]
modulus = session.getAttributeValue(pubkey, [PyKCS11.CKA_MODULUS])[0]
m = int(binascii.hexlify(bytearray(modulus)), 16)
exp = session.getAttributeValue(pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT])[0]
e = int(binascii.hexlify(bytearray(exp)), 16)
self.pubkey = PublicKey(m, e)
def sign(self, data, padding, algorithm):
privkey = self.session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), (PyKCS11.CKA_ID, self.keyid)])[0]
sig = self.session.sign(privkey, data, PyKCS11.Mechanism(PyKCS11.CKM_SHA256_RSA_PKCS))
return bytes(sig)
def public_key(self):
return self.pubkey
class Signer:
def __init__(self):
self.pkcs11 = PyKCS11.PyKCS11Lib()
self.pkcs11.load(pkcs11lib)
try:
self.slot = self.get_slot()
self.session = None
fp = open("file.xml", "rb")
r = fp.read()
fp.close()
root = etree.fromstring(r)
keyid, cert = self.get_cert()
key = Key(self.session, keyid)
data_object_format = XAdESDataObjectFormat(
Description="My XAdES signature",
MimeType="text/xml",
)
signed_root = XAdESSigner(signature_policy=None,
claimed_roles=["signer"],
data_object_format=data_object_format,
c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315").sign(root, key=key, cert=cert)
with open("signed.xml", "wb") as fp:
fp.write(etree.tostring(signed_root))
verifier = XAdESVerifier()
verify_results = verifier.verify(
signed_root, x509_cert=cert, expect_references=3, expect_signature_policy=None
)
finally:
self.logout()
def get_slot(self):
slots = self.pkcs11.getSlotList(tokenPresent=True)
return slots[0]
def get_cert(self):
self.session = self.pkcs11.openSession(
self.slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
)
self.session.login(pin)
pk11objects = self.session.findObjects(
[(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)]
)
all_attributes = [
PyKCS11.CKA_VALUE,
PyKCS11.CKA_ID,
]
for pk11object in pk11objects:
try:
attributes = self.session.getAttributeValue(pk11object, all_attributes)
except PyKCS11.PyKCS11Error as e:
continue
attr_dict = dict(list(zip(all_attributes, attributes)))
cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
keyid = bytes(attr_dict[PyKCS11.CKA_ID])
cert = x509.load_der_x509_certificate(cert, backend=default_backend())
cert = cert.public_bytes(encoding=serialization.Encoding.PEM)
return keyid, cert
def logout(self):
if self.session is not None:
self.session.logout()
self.session.closeSession()
self.session = None
if __name__ == "__main__":
Signer()
Why does Key class need a public_key() ? And why does it have to be destilled to e and n? Is there something in signing process that needs this part of interface with data in this form?
Is this RSA specific?
signxml expects key to be bytes or a RSAPrivateKey instance, in case of bytes it will transform it to a RSAPrivateKey.
then in the _serialize_key_value function it calls the public_key() and public_numbers() methods to get the modulus and exponent, so I created a Key class that kinda emulates RSAPrivateKey and implements the sign() and public_key() methods which are needed by signxml.
and yes my code is RSA specific because if you look again at the _serialize_key_value method there are other methods and values that I have not implemented like parameter_numbers() or x and y
Thank you for the pointer. I see in the _serialize_key_value there are only methods to add info for RSA and ECDSA keys. In PKCS11 IMHO there would need to be serialisation for X509Data (https://www.w3.org/TR/xmldsig-core/#sec-X509Data) instead of KeyValue. You provided the X509 cert, it would be nice to serialze it as X509. Just for starters. You made a great example. I have an EC based card. WIll try to get it work on it. PyKCS11 is missing some ECDSA mechanisms.
I made it sign, but: signxml thinks I signed with rsa-sha256, at least this is what I got in XML. Verification dies, but I know why. A sign method that you overridden is one part. I think we would also need digest method to calculate digest on the card. For both we would need to return mechanism type to signxml.
Digest is a little bigger problem as it is in XMLSignatureProcessor and just uses a Hash. So a little hidden or on the other hand all signers need to be overriden.
Maybe a signer can get a parameter Hasher that would expose digest method. By default it would be implemented with Hash, but if overriden it would calculate digest in our case on the card.
This is how X509Data could be made (it is written with python-pkcs11 and OpenSSL, but...):
X509_crt_element = etree.Element(etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Data') )
has_crt = False
#Get all certificates
for co in session.get_objects({Attribute.CLASS: ObjectClass.CERTIFICATE}):
cb4 = base64.b64encode(bytes(co[Attribute.VALUE])).decode('utf-8')
#if certificate has the same label as PK then fill data from certificate
if co[Attribute.LABEL] == priv.label:
X509_data_element = etree.SubElement(key_info_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Data') )
X509_certificate_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Certificate') )
X509_certificate_element.text = cb4
cert = load_certificate(FILETYPE_ASN1,co[Attribute.VALUE])
X509_issuer_serial_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509IssuerSerial') )
X509_issuer_name_element = etree.SubElement(X509_issuer_serial_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509IssuerName') )
X509_issuer_name_element.text = ", ".join("{:s}={:s}".format(name.decode(), value.decode()) for name, value in cert.get_issuer().get_components())
X509_serial_number_element = etree.SubElement(X509_issuer_serial_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509SerialNumber') )
X509_serial_number_element.text = str(cert.get_serial_number())
X509_subject_name_element = etree.SubElement(X509_data_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509SubjectName') )
X509_subject_name_element.text = ", ".join("{:s}={:s}".format(name.decode(), value.decode()) for name, value in cert.get_subject().get_components())
else:
#othres add to the certificate chain
X509_certificate_element = etree.SubElement(X509_crt_element, etree.QName("http://www.w3.org/2000/09/xmldsig#",'X509Certificate') )
X509_certificate_element.text = cb4
has_crt = True
if has_crt:
#Add the certificate chain
key_info_element.append(X509_crt_element)
It has a special feature to not just add a cert to verify, but also a chain if it is present on the card. Also there is an assumption to have a cert with the same label as private key.
This signs and verifies an EC key (a lot of patchwork, sorry):
import PyKCS11
from lxml import etree
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.xades import (
XAdESSigner,
XAdESVerifier,
XAdESVerifyResult,
XAdESSignaturePolicy,
XAdESDataObjectFormat,
)
from signxml.algorithms import SignatureMethod
import binascii
def decode_ec_point(keylen, data):
"""Decode an octet string into an elliptic curve point"""
if data == b"\x00":
return None, None
elif data.startswith(b"\x04"):
if len(data) == 2 * keylen + 1:
return (
int.from_bytes(data[1 : keylen + 1], "big"),
int.from_bytes(data[keylen + 1 :], "big"),
)
else:
raise ValueError("Invalid elliptic curve point data length")
else:
raise ValueError("Unsupported elliptic curve point type")
class RSAPublicNumbers:
def __init__(self, n, e):
self.n = n
self.e = e
class ECDSAPublicNumbers:
def __init__(self, x, y):
self.x = x
self.y = y
class RSAPublicKey:
def __init__(self, n, e):
self.pubnumbers = RSAPublicNumbers(n, e)
def public_numbers(self):
return self.pubnumbers
class ECDSAPublicKey:
def __init__(self, x, y):
self.pubnumbers = ECDSAPublicNumbers(x, y)
def public_numbers(self):
return self.pubnumbers
class ECDSACurve:
def __init__(self, name):
self.name = name
class Key:
def __init__(self, session, keyid):
self.session = session
self.keyid = keyid
self.curve = None
self.key_size = None
pubkey = self.session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
(PyKCS11.CKA_ID, self.keyid),
]
)[0]
key_type = session.getAttributeValue(pubkey, [PyKCS11.CKA_KEY_TYPE])[0]
if key_type == 3:
# temporary
self.curve = ECDSACurve("secp384r1")
self.key_size = 384
ec_point = session.getAttributeValue(
pubkey, [PyKCS11.CKA_EC_POINT]
)[0]
ec_x_y = decode_ec_point(int(98 / 2), bytes(ec_point))
x = ec_x_y[0]
y = ec_x_y[1]
self.pubkey = ECDSAPublicKey(x, y)
else:
modulus = session.getAttributeValue(pubkey, [PyKCS11.CKA_MODULUS])[
0
]
m = int(binascii.hexlify(bytearray(modulus)), 16)
exp = session.getAttributeValue(
pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
)[0]
e = int(binascii.hexlify(bytearray(exp)), 16)
self.pubkey = RSAPublicKey(m, e)
def sign(self, data, **argv):
privkey = self.session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
(PyKCS11.CKA_ID, self.keyid),
]
)[0]
sig = self.session.sign(
privkey, data, PyKCS11.Mechanism(PyKCS11.CKM_ECDSA_SHA256)
)
l = len(sig) / 2
r = bytearray()
s = bytearray()
for i in range(len(sig)):
if i < l:
r.append(sig[i])
else:
s.append(sig[i])
return utils.encode_dss_signature(
int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
)
def public_key(self):
return self.pubkey
class Signer:
def __init__(self):
self.pkcs11 = PyKCS11.PyKCS11Lib()
self.pkcs11.load(pkcs11lib)
self.session = None
try:
self.slot = self.get_slot()
fp = open("file.xml", "rb")
r = fp.read()
fp.close()
root = etree.fromstring(r)
# root = etree.Element("root")
# etree.SubElement(root, "child").text = "Child 1"
# etree.SubElement(root, "child").text = "Child 2"
# etree.SubElement(root, "another").text = "Child 3"
keyid, cert = self.get_cert()
key = Key(self.session, keyid)
data_object_format = XAdESDataObjectFormat(
Description="My XAdES signature",
MimeType="text/xml",
)
signed_root = XAdESSigner(
signature_policy=None,
claimed_roles=["signer"],
data_object_format=data_object_format,
c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315",
signature_algorithm=SignatureMethod.ECDSA_SHA256,
).sign(root, key=key, cert=cert)
with open("signed_signxml.xml", "wb") as fp:
fp.write(etree.tostring(signed_root))
verifier = XAdESVerifier()
verify_results = verifier.verify(
signed_root,
x509_cert=cert,
expect_references=3,
expect_signature_policy=None,
ignore_ambiguous_key_info=True,
)
for verif in verify_results:
print(verif.signed_xml)
finally:
self.logout()
def get_slot(self):
slots = self.pkcs11.getSlotList(tokenPresent=True)
return slots[0]
def get_cert(self):
self.session = self.pkcs11.openSession(
self.slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
)
self.session.login(pin)
pk11objects = self.session.findObjects(
[(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)]
)
all_attributes = [
PyKCS11.CKA_VALUE,
PyKCS11.CKA_ID,
]
for pk11object in pk11objects:
try:
attributes = self.session.getAttributeValue(
pk11object, all_attributes
)
except PyKCS11.PyKCS11Error as e:
continue
attr_dict = dict(list(zip(all_attributes, attributes)))
cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
keyid = bytes(attr_dict[PyKCS11.CKA_ID])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
cert = cert.public_bytes(encoding=serialization.Encoding.PEM)
return keyid, cert
def logout(self):
if self.session is not None:
self.session.logout()
self.session.closeSession()
self.session = None
if __name__ == "__main__":
Signer()
As mentioned before it would be nice to have just X509Data for certificate information. My case at the end needed ignore_ambiguous_key_info=True to verify because there were 2 different information for publc key (public key itself and the X509 certificate)
Also it would also be good to use all certificate info on some cards, that have certificate chain written on them.
Here is a prototype:
import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii
# taken from pksc11-tool.c
_ec_curve_info = [
("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
# ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
# ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
(
"brainpoolP384r1",
"1.3.36.3.3.2.8.1.1.11",
b"06092b240303020801010b",
384,
),
(
"brainpoolP512r1",
"1.3.36.3.3.2.8.1.1.13",
b"06092b240303020801010d",
512,
),
("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
(
"edwards25519",
"1.3.6.1.4.1159.15.1",
"130c656477617264733235353139",
255,
),
("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]
def _translate_to_ec_key_data(ec_params):
for l in _ec_curve_info:
if ec_params == l[2]:
return l
return None
# Decode EC point x and y for public key description
def decode_ec_point(keylen, data):
"""Decode an octet string into an elliptic curve point"""
if data == b"\x00":
return None, None
elif data.startswith(b"\x04"):
if len(data) == 2 * keylen + 1:
return (
int.from_bytes(data[1 : keylen + 1], "big"),
int.from_bytes(data[keylen + 1 :], "big"),
)
else:
raise ValueError("Invalid elliptic curve point data length")
else:
raise ValueError("Unsupported elliptic curve point type")
# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
l = len(data) / 2
r = bytearray()
s = bytearray()
for i in range(len(data)):
if i < l:
r.append(data[i])
else:
s.append(data[i])
return r, s
# public key information presentation for signxml
class RSAPublicNumbers:
def __init__(self, n, e):
self.n = n
self.e = e
class ECDSAPublicNumbers:
def __init__(self, x, y):
self.x = x
self.y = y
class RSAPublicKey:
def __init__(self, n, e):
self.pubnumbers = RSAPublicNumbers(n, e)
def public_numbers(self):
return self.pubnumbers
class ECDSAPublicKey:
def __init__(self, x, y):
self.pubnumbers = ECDSAPublicNumbers(x, y)
def public_numbers(self):
return self.pubnumbers
class ECDSACurve:
def __init__(self, name):
self.name = name
# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
def __init__(self):
# session for interacton with the card
self._session = None
# id of key read from private key
self._keyid = None
# type of key
self._key_type = None
# private key reference
self._private_key = None
# public key reference
self._pubkey = None
# signing certificate
self._certificate = None
# certificate chain containing signing certificate and its ca chain
self._ca_chain = None
# operations supported by the card
# they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
self._operations = None
# does user need to be logged in to use session
self._login_required = False
# public key data for EC key required by signxml public key serialisation
self.curve = None
self.key_size = None
# get private key reference and get key type and keyid for it
def _get_private_key(self, session):
if session is not None:
self._private_key = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
]
)[0]
attrs = session.getAttributeValue(
self._private_key, [PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID]
)
self._key_type = attrs[0]
self._keyid = bytes(attrs[1])
def _get_public_key(self, session):
if session is not None and self._key_type is not None:
# public key. This is only needes as long as Signer demands data of public key
# If Signer would just take cert and serialize it as X509Data, then this is not needed
pubkey = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
(PyKCS11.CKA_ID, self._keyid),
]
)[0]
if self._key_type == PyKCS11.CKK_ECDSA: # EC key
ec_attrs = session.getAttributeValue(
pubkey,
[
PyKCS11.CKA_EC_POINT,
PyKCS11.CKA_EC_PARAMS,
],
)
ec_point = ec_attrs[0]
curve_object = _translate_to_ec_key_data(
binascii.hexlify(bytearray(ec_attrs[1]))
)
if curve_object != None:
self.curve = ECDSACurve(curve_object[0])
self.key_size = curve_object[3]
ec_x_y = decode_ec_point(
int((self.key_size / 8) + 1), bytes(ec_point)
)
x = ec_x_y[0]
y = ec_x_y[1]
self._pubkey = ECDSAPublicKey(x, y)
else:
# PyKCS11.CKK_RSA
modulus = session.getAttributeValue(
pubkey, [PyKCS11.CKA_MODULUS]
)[0]
m = int(binascii.hexlify(bytearray(modulus)), 16)
exp = session.getAttributeValue(
pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
)[0]
e = int(binascii.hexlify(bytearray(exp)), 16)
self._pubkey = RSAPublicKey(m, e)
def _get_cert(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
(PyKCS11.CKA_ID, self._keyid),
]
)
all_attributes = [
PyKCS11.CKA_VALUE,
]
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, all_attributes
)
except PyKCS11.PyKCS11Error as e:
continue
attr_dict = dict(list(zip(all_attributes, attributes)))
cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
self._certificate = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
def _get_cert_with_ca_chain(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
]
)
ca_chain = []
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, [PyKCS11.CKA_VALUE]
)
except PyKCS11.PyKCS11Error as e:
continue
cert = bytes(attributes[0])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
ca_chain.append(
cert.public_bytes(encoding=serialization.Encoding.PEM)
)
self._ca_chain = b"".join(ca_chain)
# Open session with the card
# Uses pin if needed, reads permited operations(mechanisms)
# Loads private key reference, prepares public key info and
# Loads signing certificate and its ca chain if present
def open_session(self, pksc11_lib, token_label, pin):
library = PyKCS11.PyKCS11Lib()
library.load(pksc11_lib)
slots = library.getSlotList(tokenPresent=True)
slot = None
self._login_required = False
for idx, sl in enumerate(slots):
ti = library.getTokenInfo(idx)
if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
self._login_required = True
if token_label is None:
slot = sl
if ti.label.strip() == token_label:
slot = sl
break
if slot is not None:
self._session = library.openSession(
slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
)
if self._session is not None:
if self._login_required:
self._session.login(pin)
mechanisms = library.getMechanismList(slot)
self._operations = {}
for m in mechanisms:
mi = library.getMechanismInfo(0, m)
meh = str(m).replace("CKM_", "")
for mf in mi.flags_dict:
if mi.flags & mf != 0:
op = mi.flags_dict[mf].replace("CKF_", "")
mm = None
try:
if op == "DIGEST":
mm = DigestAlgorithm[meh]
if op in ["SIGN", "VERIFY"]:
mm = SignatureMethod[meh]
except Exception as e:
pass
if mm:
if op in self._operations:
self._operations[op][mm] = PyKCS11.CKM[m]
else:
self._operations[op] = {mm: PyKCS11.CKM[m]}
# it is an assumption that each token has one private key and multiple public keys and multiple certificates
self._get_private_key(self._session)
self._get_public_key(self._session)
self._get_cert(self._session)
self._get_cert_with_ca_chain(self._session)
return self._session is not None
# signxml interface
# sign data on the card using provided signature_method:SignatureMethod
def sign(self, data, **argv):
if (
self._session is not None
and self._private_key is not None
and self._key_type is not None
):
PK_me = None
if "signature_algorithm" in argv:
o = argv["signature_algorithm"]
# Call to sign should propagate SignatureMethod
# Next row need to be removed after integration
o = SignatureMethod.ECDSA_SHA256
if o in self._operations["SIGN"]:
mech = self._operations["SIGN"][o]
PK_me = PyKCS11.Mechanism(mech)
if PK_me is None:
sig = self._session.sign(self._private_key, data)
else:
sig = self._session.sign(self._private_key, data, PK_me)
if self._key_type in [
PyKCS11.CKK_ECDSA,
PyKCS11.CKK_DSA,
]:
# sign on the other side does the folowing
# # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
# (r, s) = utils.decode_dss_signature(signature)
# int_len = bits_to_bytes_unit(signing_settings.key.key_size)
# signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
# Next rows need to be removed after integration
r, s = decode_RS_signature(sig)
return utils.encode_dss_signature(
int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
)
else:
# PyKCS11.CKK_RSA
return bytes(sig)
else:
return None
# Calculate digest with provided algorithm
# return None if algorithm is not supported
# with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
def get_digest(self, data, algorithm: DigestAlgorithm):
ret = None
if self._session is not None:
if algorithm in self._operations["DIGEST"]:
mech = self._operations["DIGEST"][algorithm]
PK_me = PyKCS11.Mechanism(mech)
ret = self._session.digest(data, PK_me)
return ret
def public_key(self):
return self._pubkey
def certificate(self):
return self._certificate
def certificate_with_ca_chain(self):
return self._ca_chain
# Closeing work on an open session
def close(self):
if self._session is not None:
if self._login_required:
self._session.logout()
self._session.closeSession()
self._session = None
I took @Brandhor example, but I hope I did not break something. save it to file named PKCS11_token.py so that then this works:
from PKCS11_token import PKCS11Token
from lxml import etree
from signxml.algorithms import SignatureMethod
from signxml.xades import (
XAdESSigner,
XAdESVerifier,
XAdESDataObjectFormat,
)
class Signer:
def __init__(self):
fp = open("file.xml", "rb")
r = fp.read()
fp.close()
root = etree.fromstring(r)
# root = etree.Element("root")
# etree.SubElement(root, "child").text = "Child 1"
# etree.SubElement(root, "child").text = "Child 2"
# etree.SubElement(root, "another").text = "Child 3"
token = None
try:
token = PKCS11Token()
if token.open_session(pkcs11lib, token_label, pin):
data_object_format = XAdESDataObjectFormat(
Description="My XAdES signature",
MimeType="text/xml",
)
signed_root = XAdESSigner(
signature_policy=None,
claimed_roles=["signer"],
data_object_format=data_object_format,
c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315",
signature_algorithm=SignatureMethod.ECDSA_SHA256,
).sign(root, key=token, cert=token.certificate_with_ca_chain())
with open("signed_signxml.xml", "wb") as fp:
fp.write(etree.tostring(signed_root))
verifier = XAdESVerifier()
verify_results = verifier.verify(
signed_root,
x509_cert=token.certificate(),
expect_references=3,
expect_signature_policy=None,
ignore_ambiguous_key_info=True,
)
for verif in verify_results:
print(verif.signed_xml)
finally:
if token is not None:
token.close()
if __name__ == "__main__":
Signer()
In EC key case it needs ignore_ambiguous_key_info=True when verifying because signer writes KeyValue and X509Cerificate. Also prototype for token also includes login check and operation check. In integration there needs to be a seprate path for sign as it would be nice to get signature_algorithm from caller (now it get a class from cryptography) and when signture is returned I think Token does not need to do encoding, so that then signer decodes it.
If signxml can write just signing certificate without public key (KeyValue) it also cleans up the interface and removes the need to get public key as it is a subset of certificate. I found out that this happens when you add always_add_key_value=False,to call to sign. We still need to get key_size from private key for recoding the signature (And there is an ointment for that ;-) )
Then the prototype can be like this:
import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii
# taken from pksc11-tool.c
_ec_curve_info = [
("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
# ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
# ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
(
"brainpoolP384r1",
"1.3.36.3.3.2.8.1.1.11",
b"06092b240303020801010b",
384,
),
(
"brainpoolP512r1",
"1.3.36.3.3.2.8.1.1.13",
b"06092b240303020801010d",
512,
),
("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
(
"edwards25519",
"1.3.6.1.4.1159.15.1",
"130c656477617264733235353139",
255,
),
("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]
def _translate_to_ec_key_data(ec_params):
for l in _ec_curve_info:
if ec_params == l[2]:
return l
return None
# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
l = len(data) / 2
r = bytearray()
s = bytearray()
for i in range(len(data)):
if i < l:
r.append(data[i])
else:
s.append(data[i])
return r, s
# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
def __init__(self):
# session for interacton with the card
self._session = None
# id of key read from private key
self._keyid = None
# type of key
self._key_type = None
# private key reference
self._private_key = None
# signing certificate
self._certificate = None
# certificate chain containing signing certificate and its ca chain
self._ca_chain = None
# operations supported by the card
# they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
self._operations = None
# does user need to be logged in to use session
self._login_required = False
# public key data for EC key required by signxml public key serialisation
# and for signature recoding (i think it can be sidestepped by proper signature encoding)
self.key_size = None
# get private key reference and get key type and keyid for it
def _get_private_key(self, session):
if session is not None:
self._private_key = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
]
)[0]
attrs = session.getAttributeValue(
self._private_key,
[PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID, PyKCS11.CKA_EC_PARAMS],
)
self._key_type = attrs[0]
self._keyid = bytes(attrs[1])
curve_object = _translate_to_ec_key_data(
binascii.hexlify(bytearray(attrs[2]))
)
if curve_object != None:
self.key_size = curve_object[3]
def _get_cert(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
(PyKCS11.CKA_ID, self._keyid),
]
)
all_attributes = [
PyKCS11.CKA_VALUE,
]
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, all_attributes
)
except PyKCS11.PyKCS11Error as e:
continue
attr_dict = dict(list(zip(all_attributes, attributes)))
cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
self._certificate = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
def _get_cert_with_ca_chain(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
]
)
ca_chain = []
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, [PyKCS11.CKA_VALUE]
)
except PyKCS11.PyKCS11Error as e:
continue
cert = bytes(attributes[0])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
ca_chain.append(
cert.public_bytes(encoding=serialization.Encoding.PEM)
)
self._ca_chain = b"".join(ca_chain)
# Open session with the card
# Uses pin if needed, reads permited operations(mechanisms)
# Loads private key reference, prepares public key info and
# Loads signing certificate and its ca chain if present
def open_session(self, pksc11_lib, token_label, pin):
library = PyKCS11.PyKCS11Lib()
library.load(pksc11_lib)
slots = library.getSlotList(tokenPresent=True)
slot = None
self._login_required = False
for idx, sl in enumerate(slots):
ti = library.getTokenInfo(idx)
if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
self._login_required = True
if token_label is None:
slot = sl
if ti.label.strip() == token_label:
slot = sl
break
if slot is not None:
self._session = library.openSession(
slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
)
if self._session is not None:
if self._login_required:
self._session.login(pin)
mechanisms = library.getMechanismList(slot)
self._operations = {}
for m in mechanisms:
mi = library.getMechanismInfo(0, m)
meh = str(m).replace("CKM_", "")
for mf in mi.flags_dict:
if mi.flags & mf != 0:
op = mi.flags_dict[mf].replace("CKF_", "")
mm = None
try:
if op == "DIGEST":
mm = DigestAlgorithm[meh]
if op in ["SIGN", "VERIFY"]:
mm = SignatureMethod[meh]
except Exception as e:
pass
if mm:
if op in self._operations:
self._operations[op][mm] = PyKCS11.CKM[m]
else:
self._operations[op] = {mm: PyKCS11.CKM[m]}
# it is an assumption that each token has one private key and multiple public keys and multiple certificates
self._get_private_key(self._session)
self._get_cert(self._session)
self._get_cert_with_ca_chain(self._session)
return self._session is not None
# signxml interface
# sign data on the card using provided signature_method:SignatureMethod
def sign(self, data, **argv):
if (
self._session is not None
and self._private_key is not None
and self._key_type is not None
):
PK_me = None
if "signature_algorithm" in argv:
o = argv["signature_algorithm"]
# Call to sign should propagate SignatureMethod
# Next row need to be removed after integration
o = SignatureMethod.ECDSA_SHA256
if o in self._operations["SIGN"]:
mech = self._operations["SIGN"][o]
PK_me = PyKCS11.Mechanism(mech)
if PK_me is None:
sig = self._session.sign(self._private_key, data)
else:
sig = self._session.sign(self._private_key, data, PK_me)
if self._key_type in [
PyKCS11.CKK_ECDSA,
PyKCS11.CKK_DSA,
]:
# sign on the other side does the folowing
# # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
# (r, s) = utils.decode_dss_signature(signature)
# int_len = bits_to_bytes_unit(signing_settings.key.key_size)
# signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
# Next rows need to be removed after integration
r, s = decode_RS_signature(sig)
return utils.encode_dss_signature(
int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
)
else:
# PyKCS11.CKK_RSA
return bytes(sig)
else:
return None
# Calculate digest with provided algorithm
# return None if algorithm is not supported
# with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
def get_digest(self, data, algorithm: DigestAlgorithm):
ret = None
if self._session is not None:
if algorithm in self._operations["DIGEST"]:
mech = self._operations["DIGEST"][algorithm]
PK_me = PyKCS11.Mechanism(mech)
ret = self._session.digest(data, PK_me)
return ret
def certificate(self):
return self._certificate
def certificate_with_ca_chain(self):
return self._ca_chain
# Closeing work on an open session
def close(self):
if self._session is not None:
if self._login_required:
self._session.logout()
self._session.closeSession()
self._session = None
There is still room for improvement, but it is a start. Thank you @Brandhor for the kick in the right direction. Using EC key woken some bugs that are getting fixed and maybe there will be a solution in the future.
RSA names from the card are special, so I made a translation to produce currect mechanism. Translation equals what is in the @Brandhor example.
import PyKCS11
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import utils
from signxml.algorithms import SignatureMethod, DigestAlgorithm
import binascii
# translate from mechanism reported by the card to SignatureMethod names (If it realy is tha same only time will tell)
_pkcs_mech_names = {
"RSA_PKCS": "RSA",
"SHA1_RSA_PKCS": "RSA_SHA1",
"SHA224_RSA_PKCS": "RSA_SHA224",
"SHA256_RSA_PKCS": "RSA_SHA256",
"SHA384_RSA_PKCS": "RSA_SHA384",
"SHA512_RSA_PKCS": "RSA_SHA512",
"MD5_RSA_PKCS": "RSA_MD5",
"RIPEMD160_RSA_PKCS": "RSA_RIPEMD160",
}
# taken from pksc11-tool.c
_ec_curve_info = [
("secp192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("prime192v2", "1.2.840.10045.3.1.2", b"06082a8648ce3d030102", 192),
("prime192v3", "1.2.840.10045.3.1.3", b"06082a8648ce3d030103", 192),
("nistp192", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("ansiX9p192r1", "1.2.840.10045.3.1.1", b"06082a8648ce3d030101", 192),
("secp224r1", "1.3.132.0.33", b"06052b81040021", 224),
("nistp224", "1.3.132.0.33", b"06052b81040021", 224),
("prime256v1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("secp256r1", "1.2.840.10045.3.1.7", b"06082a8648ce3d030107", 256),
("ansiX9p256r1", "1.2.840.10045.3.1.7", b"06082b8648ce3d030107", 256),
("frp256v1", "1.2.250.1.223.101.256.1", b"060a2a817a01815f65820001", 256),
("secp384r1", "1.3.132.0.34", b"06052b81040022", 384),
# ("prime384v1", "1.3.132.0.34", b"06052b81040022", 384),
# ("ansiX9p384r1", "1.3.132.0.34", b"06052b81040022", 384),
("prime521v1", "1.3.132.0.35", b"06052b81040023", 521),
("secp521r1", "1.3.132.0.35", b"06052b81040023", 521),
("nistp521", "1.3.132.0.35", b"06052b81040023", 521),
("brainpoolP192r1", "1.3.36.3.3.2.8.1.1.3", b"06092b2403030208010103", 192),
("brainpoolP224r1", "1.3.36.3.3.2.8.1.1.5", b"06092b2403030208010105", 224),
("brainpoolP256r1", "1.3.36.3.3.2.8.1.1.7", b"06092b2403030208010107", 256),
("brainpoolP320r1", "1.3.36.3.3.2.8.1.1.9", b"06092b2403030208010109", 320),
(
"brainpoolP384r1",
"1.3.36.3.3.2.8.1.1.11",
b"06092b240303020801010b",
384,
),
(
"brainpoolP512r1",
"1.3.36.3.3.2.8.1.1.13",
b"06092b240303020801010d",
512,
),
("secp192k1", "1.3.132.0.31", b"06052b8104001f", 192),
("secp256k1", "1.3.132.0.10", b"06052b8104000a", 256),
("secp521k1", "1.3.132.0.35", b"06052b81040023", 521),
(
"edwards25519",
"1.3.6.1.4.1159.15.1",
"130c656477617264733235353139",
255,
),
("curve25519", "1.3.6.1.4.3029.1.5.1", b"130b63757276653235353139", 255),
]
def _translate_to_ec_key_data(ec_params):
for l in _ec_curve_info:
if ec_params == l[2]:
return l
return None
# Decode EC point x and y for public key description
def decode_ec_point(keylen, data):
"""Decode an octet string into an elliptic curve point"""
if data == b"\x00":
return None, None
elif data.startswith(b"\x04"):
if len(data) == 2 * keylen + 1:
return (
int.from_bytes(data[1 : keylen + 1], "big"),
int.from_bytes(data[keylen + 1 :], "big"),
)
else:
raise ValueError("Invalid elliptic curve point data length")
else:
raise ValueError("Unsupported elliptic curve point type")
# ECDSA keys come fro the card RS encoded, for transformation we need separate r and s
def decode_RS_signature(data):
l = len(data) / 2
r = bytearray()
s = bytearray()
for i in range(len(data)):
if i < l:
r.append(data[i])
else:
s.append(data[i])
return r, s
# public key information presentation for signxml
class RSAPublicNumbers:
def __init__(self, n, e):
self.n = n
self.e = e
class ECDSAPublicNumbers:
def __init__(self, x, y):
self.x = x
self.y = y
class RSAPublicKey:
def __init__(self, n, e):
self.pubnumbers = RSAPublicNumbers(n, e)
def public_numbers(self):
return self.pubnumbers
class ECDSAPublicKey:
def __init__(self, x, y):
self.pubnumbers = ECDSAPublicNumbers(x, y)
def public_numbers(self):
return self.pubnumbers
class ECDSACurve:
def __init__(self, name):
self.name = name
# Token representation for use in signing
# signxml can take it as a key as it implements sign and public key presentation
# in addition it loads certificate data in form of seigning certificate or signing certificate plus the CA chain
class PKCS11Token:
def __init__(self):
# session for interacton with the card
self._session = None
# id of key read from private key
self._keyid = None
# type of key
self._key_type = None
# private key reference
self._private_key = None
# public key reference
self._pubkey = None
# signing certificate
self._certificate = None
# certificate chain containing signing certificate and its ca chain
self._ca_chain = None
# operations supported by the card
# they are separated in operation groups (DIGEST,VERIFY,SIGN,ENCRYPT,DECRYPT)
self._operations = None
# does user need to be logged in to use session
self._login_required = False
# public key data for EC key required by signxml public key serialisation
self.curve = None
self.key_size = None
# get private key reference and get key type and keyid for it
def _get_private_key(self, session):
if session is not None:
self._private_key = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
]
)[0]
attrs = session.getAttributeValue(
self._private_key, [PyKCS11.CKA_KEY_TYPE, PyKCS11.CKA_ID]
)
self._key_type = attrs[0]
self._keyid = bytes(attrs[1])
def _get_public_key(self, session):
if session is not None and self._key_type is not None:
# public key. This is only needes as long as Signer demands data of public key
# If Signer would just take cert and serialize it as X509Data, then this is not needed
pubkey = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY),
(PyKCS11.CKA_ID, self._keyid),
]
)[0]
if self._key_type == PyKCS11.CKK_ECDSA: # EC key
ec_attrs = session.getAttributeValue(
pubkey,
[
PyKCS11.CKA_EC_POINT,
PyKCS11.CKA_EC_PARAMS,
],
)
ec_point = ec_attrs[0]
curve_object = _translate_to_ec_key_data(
binascii.hexlify(bytearray(ec_attrs[1]))
)
if curve_object != None:
self.curve = ECDSACurve(curve_object[0])
self.key_size = curve_object[3]
ec_x_y = decode_ec_point(
int((self.key_size / 8) + 1), bytes(ec_point)
)
x = ec_x_y[0]
y = ec_x_y[1]
self._pubkey = ECDSAPublicKey(x, y)
else:
# PyKCS11.CKK_RSA
modulus = session.getAttributeValue(
pubkey, [PyKCS11.CKA_MODULUS]
)[0]
m = int(binascii.hexlify(bytearray(modulus)), 16)
exp = session.getAttributeValue(
pubkey, [PyKCS11.CKA_PUBLIC_EXPONENT]
)[0]
e = int(binascii.hexlify(bytearray(exp)), 16)
self._pubkey = RSAPublicKey(m, e)
def _get_cert(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
(PyKCS11.CKA_ID, self._keyid),
]
)
all_attributes = [
PyKCS11.CKA_VALUE,
]
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, all_attributes
)
except PyKCS11.PyKCS11Error as e:
continue
attr_dict = dict(list(zip(all_attributes, attributes)))
cert = bytes(attr_dict[PyKCS11.CKA_VALUE])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
self._certificate = cert.public_bytes(
encoding=serialization.Encoding.PEM
)
def _get_cert_with_ca_chain(self, session):
if session is not None:
pk11objects = session.findObjects(
[
(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE),
]
)
ca_chain = []
for pk11object in pk11objects:
try:
attributes = session.getAttributeValue(
pk11object, [PyKCS11.CKA_VALUE]
)
except PyKCS11.PyKCS11Error as e:
continue
cert = bytes(attributes[0])
cert = x509.load_der_x509_certificate(
cert, backend=default_backend()
)
ca_chain.append(
cert.public_bytes(encoding=serialization.Encoding.PEM)
)
self._ca_chain = b"".join(ca_chain)
# Open session with the card
# Uses pin if needed, reads permited operations(mechanisms)
# Loads private key reference, prepares public key info and
# Loads signing certificate and its ca chain if present
def open_session(self, pksc11_lib, token_label, pin):
library = PyKCS11.PyKCS11Lib()
library.load(pksc11_lib)
slots = library.getSlotList(tokenPresent=True)
slot = None
self._login_required = False
for idx, sl in enumerate(slots):
ti = library.getTokenInfo(idx)
if ti.flags & PyKCS11.CKF_LOGIN_REQUIRED != 0:
self._login_required = True
if token_label is None:
slot = sl
if ti.label.strip() == token_label:
slot = sl
break
if slot is not None:
self._session = library.openSession(
slot, PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION
)
if self._session is not None:
if self._login_required:
self._session.login(pin)
mechanisms = library.getMechanismList(slot)
self._operations = {}
for m in mechanisms:
mi = library.getMechanismInfo(0, m)
meh = str(m).replace("CKM_", "")
meh = _pkcs_mech_names.get(meh, meh)
for mf in mi.flags_dict:
if mi.flags & mf != 0:
op = mi.flags_dict[mf].replace("CKF_", "")
mm = None
try:
if op == "DIGEST":
mm = DigestAlgorithm[meh]
if op in ["SIGN", "VERIFY"]:
mm = SignatureMethod[meh]
except Exception as e:
pass
if mm:
if op in self._operations:
self._operations[op][mm] = PyKCS11.CKM[m]
else:
self._operations[op] = {mm: PyKCS11.CKM[m]}
# it is an assumption that each token has one private key and multiple public keys and multiple certificates
self._get_private_key(self._session)
self._get_public_key(self._session)
self._get_cert(self._session)
self._get_cert_with_ca_chain(self._session)
return self._session is not None
# signxml interface
# sign data on the card using provided signature_method:SignatureMethod
def sign(self, data, **argv):
if (
self._session is not None
and self._private_key is not None
and self._key_type is not None
):
PK_me = None
if "signature_algorithm" in argv:
o = argv["signature_algorithm"]
# Call to sign should propagate SignatureMethod
# Next row need to be removed after integration
o = SignatureMethod.ECDSA_SHA256
else:
o = SignatureMethod.RSA_SHA256
if o in self._operations["SIGN"]:
mech = self._operations["SIGN"][o]
PK_me = PyKCS11.Mechanism(mech)
if PK_me is None:
sig = self._session.sign(self._private_key, data)
else:
sig = self._session.sign(self._private_key, data, PK_me)
if self._key_type in [
PyKCS11.CKK_ECDSA,
PyKCS11.CKK_DSA,
]:
# sign on the other side does the folowing
# # Note: The output of the DSA and ECDSA signers is a DER-encoded ASN.1 sequence of two DER integers.
# (r, s) = utils.decode_dss_signature(signature)
# int_len = bits_to_bytes_unit(signing_settings.key.key_size)
# signature = long_to_bytes(r, blocksize=int_len) + long_to_bytes(s, blocksize=int_len)
# Next rows need to be removed after integration
r, s = decode_RS_signature(sig)
return utils.encode_dss_signature(
int(binascii.hexlify(r), 16), int(binascii.hexlify(s), 16)
)
else:
# PyKCS11.CKK_RSA
return bytes(sig)
else:
return None
# Calculate digest with provided algorithm
# return None if algorithm is not supported
# with modified signxml signing procedure the same token can calculate a digest if algorithm is supported
def get_digest(self, data, algorithm: DigestAlgorithm):
ret = None
if self._session is not None:
if algorithm in self._operations["DIGEST"]:
mech = self._operations["DIGEST"][algorithm]
PK_me = PyKCS11.Mechanism(mech)
ret = self._session.digest(data, PK_me)
return ret
def public_key(self):
return self._pubkey
def certificate(self):
return self._certificate
def certificate_with_ca_chain(self):
return self._ca_chain
# Closeing work on an open session
def close(self):
if self._session is not None:
if self._login_required:
self._session.logout()
self._session.closeSession()
self._session = None