Parsing Certificate Transparency Logs

Several years ago Mr Ryan Sears wrote a very good blog on Parsing Certificate Transparency Logs. The links are here

He also wrote a very handy tool called Axeman which still works till date with some minor python version tweaks . Emphasizing on the word WORKS. The main thing is to get the correct version of python and the "construct" package to work together. The idea is to use a very slightly older version of python and more importantly the "construct" package which supports the "Embedded" keyword.

The Itch

While I had Axeman running, I thought it should be like a line or two change to make it work with the latest version. The itch in me was taking over for no reason.

What could have been solved with a simple ignorance towards versions, led me to a path of immense pain and learning.

I re-wrote the structs for parsing in python.

So for all the readers , see the structs for parsing the CTL Merkle Tree below. It uses structs instead of construct. I find it more usable

import struct
from enum import Enum
from OpenSSL import crypto 


class LogEntryType(Enum):
        uninitialized = -1  # Not set
        X509LogEntryType = 0
        PrecertLogEntryType = 1
# MerkleTreeHeader = Struct(
#     "Version"         / Byte,
#     "MerkleLeafType"  / Byte,
#     "Timestamp"       / Int64ub,
#     "LogEntryType"    / Enum(Int16ub, X509LogEntryType=0, PrecertLogEntryType=1),
#     "Entry"           / GreedyBytes
# )
class MerkleTreeParser:
    Version = 0
    MerkleLeafType = 0
    Timestamp = 0
    LogEntryType = LogEntryType.uninitialized
    Entry = b''
    
    def __init__(self, data):
        FORMAT = f'>BBQH'
        unpacked = struct.unpack(FORMAT, data[:struct.calcsize(FORMAT)])
        self.Version = unpacked[0]
        self.MerkleLeafType = unpacked[1]
        self.Timestamp = unpacked[2]
        self.LogEntryType = LogEntryType(unpacked[3])
        self.Entry = data[struct.calcsize(FORMAT):]
    
    def __str__(self) -> str:
        return f"Version: {self.Version}, MerkleLeafType: {self.MerkleLeafType}, Timestamp: {self.Timestamp}, LogEntryType: {self.LogEntryType}, Entry: {self.Entry}"

# Certificate = Struct(
#     "Length" / Int24ub,
#     "CertData" / Bytes(this.Length)
# )
        
class Certificate:
    Length = 0
    CertData  = b''
    
    def __init__(self, data):
        if len(data) == 0:
            return
        FORMAT = f'>I'
        unpacked = struct.unpack(FORMAT, b'\x00' + data[:3])
        self.Length = unpacked[0]
        self.CertData = data[3:]
    
    def __str__(self) -> str:
        return f"Length: {self.Length}, CertData: {self.CertData}"    

# CertificateChain = Struct(
#     "ChainLength" / Int24ub,
#     "Chain" / GreedyRange(Certificate),
# )

class CertificateChain:
    ChainLength : int = 0
    Chain:list = []
    
    def __init__(self, data):
        if len(data) == 0:
            return
        FORMAT = f'>I'
        unpacked = struct.unpack(FORMAT, b'\x00' + data[:3])
        self.ChainLength = unpacked[0]
        data = data[3:]
        
        while len(data) > 0:
            length = struct.unpack(FORMAT, b'\x00' + data[:3])[0]
            cert_data = data[:3+length]
            cert = Certificate(cert_data)
            self.Chain.append(cert)
            data = data[3+length:]
    
    def __str__(self) -> str:
        return f"ChainLength: {self.ChainLength}, Chain: {self.Chain}"
    
# PreCertEntry = Struct(
#     "LeafCert" / Certificate,
#     Embedded(CertificateChain),
#     Terminated
# )
class PreCertEntry:
    LeafCert = Certificate(b'')
    Chain = CertificateChain(b'')
    
    def __init__(self, data):
        if len(data) == 0:
            return
        FORMAT = f'>I'
        leafcert_length = struct.unpack(FORMAT, b'\x00' + data[:3])[0]
        self.LeafCert = Certificate(data[:3+leafcert_length])
        data = data[3+leafcert_length:]
        self.Chain = CertificateChain(data)
    
    def __str__(self) -> str:
        return f"LeafCert: {self.LeafCert}, Chain: {self.Chain}"

I leave it to readers to reimplement Axeman to use these structs. Its not that difficult. But as I said earlier, AXEMAN Works, so this hardly makes any dent.

The Itch Part II

It works in python and I should have left it there. There is no need for optimization.

I felt after running Axeman for a few hours and doing some secondary research the python as always just is SLOW and leaks memory. After like running the program for 15 hours, my computer was completely unusable. While writing this blog I realize it could have been anything on my computer, but guess what, I blame it to python.

I started a journey to learn Rust and see how much better it is than python. Spent around a month understanding Rust and wrote my first program in it.

See the details below for how to do same in rust.

use base64::{prelude::BASE64_STANDARD, Engine};
use std::io::{Cursor, Read};
use byteorder::{BigEndian, ReadBytesExt};
pub mod utils;

#[repr(u8)]
#[derive(Debug,PartialEq)]
pub enum ELogEntryType {
    UnInitialized ,
    X509LogEntryType,
    PrecertLogEntryType,
}
//     # MerkleTreeHeader = Struct(
//     #     "Version"         / Byte,
//     #     "MerkleLeafType"  / Byte,
//     #     "Timestamp"       / Int64ub,
//     #     "LogEntryType"    / Enum(Int16ub, X509LogEntryType=0, PrecertLogEntryType=1),
//     #     "Entry"           / GreedyBytes
//     # )
pub struct MerkleTreeHeader {
    pub version: u8,
    pub merkle_leaf_type: u8,
    pub timestamp: u64,
    pub log_entry_type: ELogEntryType,
    pub entry: Vec<u8>,
}
impl MerkleTreeHeader {
    pub fn new(data:&Vec<u8>) -> MerkleTreeHeader {
        let mut header_bytes = Cursor::new(data);
        let version = header_bytes.read_u8().unwrap();
        let merkle_leaf_type = header_bytes.read_u8().unwrap();
        let timestamp = header_bytes.read_u64::<BigEndian>().unwrap();
        let log_entry_type = match header_bytes.read_u16::<BigEndian>().unwrap() {
            0 => ELogEntryType::X509LogEntryType,
            1 => ELogEntryType::PrecertLogEntryType,
            _ => ELogEntryType::UnInitialized,
        };
        let mut entry = Vec::new();
        header_bytes.read_to_end(&mut entry).unwrap();
        MerkleTreeHeader {
            version,
            merkle_leaf_type,
            timestamp,
            log_entry_type,
            entry,
        }
    }

    pub fn new_b64(data:&String) -> MerkleTreeHeader {
        let header_bytes = BASE64_STANDARD.decode(data).expect("Failed to decode Leaf Header Base64 data.");
        return Self::new(&header_bytes);
    }
    
    
}
// # Certificate = Struct(
//     #     "Length" / Int24ub,
//     #     "CertData" / Bytes(this.Length)
//     # )
pub struct Certificate{
    pub length: u32,
    pub cert_data: Vec<u8>,
}
impl Certificate {
    pub fn new(data:&Vec<u8>) -> Certificate {
        let mut certificate_bytes = Cursor::new(data);
        let length = certificate_bytes.read_u24::<BigEndian>().unwrap();
        let mut cert_data = Vec::new();
        certificate_bytes.read_to_end(&mut cert_data).unwrap();
        Certificate {
            length:length,
            cert_data:cert_data
        }
        
    }
}
// # CertificateChain = Struct(
//     #     "ChainLength" / Int24ub,
//     #     "Chain" / GreedyRange(Certificate),
//     # )
pub struct CertificateChain{
    chain_length:u32,
    chain:Vec<Certificate>
}
impl CertificateChain {
    pub fn new(data:&Vec<u8>)->CertificateChain{
        let mut cursor = Cursor::new(data);
        let chain_length : u32 = cursor.read_u24::<BigEndian>().unwrap();
        let mut chain : Vec<Certificate> = Vec::new(); 
        let cur_length = cursor.get_ref().len();
        while cursor.position() < cur_length as u64 {
            let cert_length = cursor.read_u24::<BigEndian>().unwrap();
            let mut cert_data : Vec<u8> = Vec::with_capacity(cert_length as usize);
            cert_data.resize(cert_length as usize, 0u8);
            cursor.read_exact(&mut cert_data).unwrap();
            let cert : Certificate = Certificate {
                length: cert_length,
                cert_data:cert_data,
            };
            chain.push(cert);
        }
        return CertificateChain{
            chain_length:chain_length,
            chain:chain
        }
       
    }
    pub fn new_b64(data:&String)->CertificateChain{
        let bytes  = BASE64_STANDARD.decode(data).expect("Unable to decode Certificate Chain");
        return Self::new(&bytes);
    }
    
}
// # PreCertEntry = Struct(
//     #     "LeafCert" / Certificate,
//     #     Embedded(CertificateChain),
//     #     Terminated
//     # )
pub struct PreCertEntry{
    leaf_cert:Certificate,
    chain:CertificateChain
} 
impl PreCertEntry {
    pub fn new(data:&Vec<u8>)->PreCertEntry{
        let mut cursor = Cursor::new(data);
        let leafcert_length = cursor.read_u24::<BigEndian>().unwrap();
        let mut cert_data:Vec<u8> = Vec::with_capacity(leafcert_length as usize);
        cert_data.resize(leafcert_length as usize, 0u8);
        cursor.read_exact(&mut cert_data).unwrap();
        let cert:Certificate = Certificate{length:leafcert_length,cert_data:cert_data};
        let mut chain_data:Vec<u8> = Vec::new();
        cursor.read_to_end(&mut chain_data).unwrap();
        let chain:CertificateChain = CertificateChain::new(&chain_data);
        return PreCertEntry{leaf_cert:cert,chain:chain};
    }
    pub fn new_b64(data:&String)->PreCertEntry{
        let bytes = BASE64_STANDARD.decode(&data).unwrap();
        return Self::new(&bytes);
    }
    
}

Thanks

I think this might be the most boring post. But its my journey to learn something new. So that is it.

I might release my tool in future and its possible uses , but that's in future now.

Last updated