Trying to write client for heimdall server in Julia

Hi all,

I am currently an application, that is supposed to read and interpret the data coming from the heimdall server over TCP. I took the advice from this post: How to get TCP data (IQ at port: 5000 and Control at port: 5001) from Heimdal_Daq_Fw for other custom application? and wrote a copy of the krakenSDRSource block for GNURadio.

I am pretty sure, that everything on the hardware/heimdall server site is setup correctly, as I can see the data both on GNURadio and also using the heimdall rec tool.

I rebuilt the Block as a Julia module and made some progress, but I am currently hitting a wall. I am able to write to the heimdall server and also get responses. I am also able to open a connection to the control socket and send commands. Now the two big problems (which might be linked):

  1. When I read the frame header, after sending a control command, nothing changes (sometimes the sync flag goes to 0). Also even tough the frequency is set in the daq_chain_config.ini to 80MHz and then also by the control command, I get ridicuosly high numbers as center frequency: 343597383680000000 (The 80 MHz are at the end and the front number seems to be a windows pointing error, but i am on linux). The gains are set for channels 3-7 instead of 1-5 and some more weird stuff.
  2. after initialization I only get dummy data (cpi_length = 0). Either data or raw data (?) with 32,768 bytes. From looking at the documentation ( heimdall_daq_fw/Documentation/HDAQ_firmware_ver1.0.20201130.pdf at main · krakenrf/heimdall_daq_fw · GitHub ) chapter 2.9.1 I guess this has something to do with calibration, but neither cal_track_mode=2 nor =0 work for me.

I wasn’t able to find any further documentation or code examples besides the gnuradio block and that hasn’t helped me yet. Does someone have experience writing custom software to read the IQ frames and can help me?

Thanks a lot in advance
Alex

Here is the source code of my KrakenSDRSource module in Julia

module KrakenSDR

#= 
   Julia translation of the KrakenSDR Python source block.
   This module provides a Julia implementation of the KrakenSDR source,
   including socket communication, IQ header parsing, and sample buffering.
   
   Note: Julia does not have a direct equivalent to GNU Radio's sync_block.
   This implementation uses Julia's native socket and threading capabilities.
   You may need to integrate this with a Julia DSP or SDR framework (e.g., DSP.jl, Sockets.jl).
=#

using Sockets
using Base.Threads
using Base.Libc: ltoh, htol

export KrakenSDRSource, work!, stop!, set_center_freq!, set_if_gain!

# --- Constants and Types ---
const FRAME_TYPE_DATA = 0
const FRAME_TYPE_DUMMY = 1
const FRAME_TYPE_RAMP = 2
const FRAME_TYPE_CAL = 3
const FRAME_TYPE_TRIGW = 4

const SYNC_WORD = 0x2bf7b95a #2bf7b95a

const HEADER_SIZE = 1024
const RESERVED_BYTES = 192

# Valid gains for KrakenSDR
const VALID_GAINS = [0, 0.9, 1.4, 2.7, 3.7, 7.7, 8.7, 12.5, 14.4, 15.7, 16.6, 19.7, 20.7, 22.9, 25.4, 28.0, 29.7, 32.8, 33.8, 36.4, 37.2, 38.6, 40.2, 42.1, 43.4, 43.9, 44.5, 48.0, 49.6]

# --- IQHeader Struct ---
mutable struct IQHeader
    header_size::UInt32
    reserved_bytes::UInt32
    # Fields read from socket
    sync_word::UInt32
    frame_type::UInt32
    hardware_id::String
    unit_id::UInt32
    active_ant_chs::UInt32
    ioo_type::UInt32
    rf_center_freq::UInt64
    adc_sampling_freq::UInt64
    sampling_freq::UInt64
    cpi_length::UInt32
    time_stamp::UInt64
    daq_block_index::UInt32
    cpi_index::UInt32
    ext_integration_cntr::UInt64
    data_type::UInt32
    sample_bit_depth::UInt32
    adc_overdrive_flags::UInt32
    if_gains::Vector{UInt32}
    delay_sync_flag::UInt32
    iq_sync_flag::UInt32
    sync_state::UInt32
    noise_source_state::UInt32
    reserved::Vector{UInt32}
    header_version::UInt32

    function IQHeader()
        new(
            # # 1-2: Constants
            HEADER_SIZE, RESERVED_BYTES,
            # 3-19: Scalar fields (UInt32/UInt64/String)
            SYNC_WORD, 0, "", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
            # 20: if_gains (Vector{UInt32})
            zeros(UInt32, 32),
            # 21-24: Scalar fields (UInt32)
            0, 0, 0, 0,
            # 25: reserved (Vector{UInt32})
            zeros(UInt32, RESERVED_BYTES),
            # 26: header_version (UInt32)
            0
        )
    end
end

function decode_header!(header::IQHeader, iq_header_bytes::Vector{UInt8})
    io = IOBuffer(iq_header_bytes)
    header.sync_word = read(io, UInt32)
    header.frame_type = read(io, UInt32)
    header.hardware_id = String(read(io, 16)) |> strip
    skip(io, 16 - length(header.hardware_id))
    header.unit_id = read(io, UInt32)
    header.active_ant_chs = read(io, UInt32)
    header.ioo_type = read(io, UInt32)
    header.rf_center_freq = read(io, UInt64)
    header.adc_sampling_freq = read(io, UInt64)
    header.sampling_freq = read(io, UInt64)
    header.cpi_length = read(io, UInt32)
    header.time_stamp = read(io, UInt64)
    header.daq_block_index = read(io, UInt32)
    header.cpi_index = read(io, UInt32)
    header.ext_integration_cntr = read(io, UInt64)
    header.data_type = read(io, UInt32)
    header.sample_bit_depth = read(io, UInt32)
    header.adc_overdrive_flags = read(io, UInt32)
    header.if_gains = [read(io, UInt32) for _ in 1:32]
    header.delay_sync_flag = read(io, UInt32)
    header.iq_sync_flag = read(io, UInt32)
    header.sync_state = read(io, UInt32)
    header.noise_source_state = read(io, UInt32)
    header.reserved = [read(io, UInt32) for _ in 1:RESERVED_BYTES]
    header.header_version = read(io, UInt32)
    return header
end

function dump_header(header::IQHeader)
    println("Sync word: $(header.sync_word)")
    println("Header version: $(header.header_version)")
    println("Frame type: $(header.frame_type)")
    println("Hardware ID: $(header.hardware_id)")
    println("Unit ID: $(header.unit_id)")
    println("Active antenna channels: $(header.active_ant_chs)")
    println("Illuminator type: $(header.ioo_type)")
    println("RF center frequency: $(header.rf_center_freq) Hz")
    println("ADC sampling frequency: $(header.adc_sampling_freq) Hz")
    println("IQ sampling frequency: $(header.sampling_freq) Hz")
    println("CPI length: $(header.cpi_length)")
    println("Unix Epoch timestamp: $(header.time_stamp)")
    println("DAQ block index: $(header.daq_block_index)")
    println("CPI index: $(header.cpi_index)")
    println("Extended integration counter: $(header.ext_integration_cntr)")
    println("Data type: $(header.data_type)")
    println("Sample bit depth: $(header.sample_bit_depth)")
    println("ADC overdrive flags: $(header.adc_overdrive_flags)")
    for (i, gain) in enumerate(header.if_gains)
        println("Ch: $i IF gain: $(gain / 10) dB")
    end
    println("Delay sync flag: $(header.delay_sync_flag)")
    println("IQ sync flag: $(header.iq_sync_flag)")
    println("Sync state: $(header.sync_state)")
    println("Noise source state: $(header.noise_source_state)")
end

# --- KrakenSDR Source ---
mutable struct KrakenSDRSource
    ipAddr::String
    port::Int
    ctrlPort::Int
    numChannels::Int
    freq::Float64
    gain::Vector{Float64}
    debug::Bool
    iq_header::IQHeader
    
    # Data Interface
    socket_inst::TCPSocket
    receiver_connection_status::Bool
    receiverBufferSize::Int
    
    # Control Interface
    ctr_iface_socket::TCPSocket
    ctr_iface_port::Int
    ctr_iface_thread_lock::ReentrantLock
    
    # Buffering
    cpi_len::Int
    total_fetched::Int
    iq_samples::Union{Nothing, Matrix{ComplexF32}}
    iq_sample_queue::Channel{Matrix{ComplexF32}}
    
    # Threading
    stop_threads::Bool
    buffer_thread::Union{Nothing, Task}
    
    function KrakenSDRSource(
        ipAddr::String="127.0.0.1",
        port::Int=5000,
        ctrlPort::Int=5001,
        numChannels::Int=5,
        freq::Float64=416.588,
        gain::Vector{Float64}=[10.0],
        debug::Bool=false
    )
        # Initialize header
        iq_header = IQHeader()
        
        # Initialize sockets
        socket_inst = Sockets.TCPSocket()
        ctr_iface_socket = Sockets.TCPSocket()
        
        # Initialize queue
        iq_sample_queue = Channel{Matrix{ComplexF32}}(10)
        
        # Connect and initialize
        source = new(
            ipAddr, port, ctrlPort, numChannels, freq, gain, debug, iq_header,
            socket_inst, false, 2^18,
            ctr_iface_socket, ctrlPort, ReentrantLock(),
            0, 0, nothing, iq_sample_queue,
            false, nothing
        )
        
        if get_iq_online!(source) == -1
            @error "Failed to initialize connection"
            eth_close!(source)
            return source
        end
        
        source.cpi_len = source.iq_header.cpi_length
        source.total_fetched = source.iq_header.cpi_length
        @info "IQ Data collection online"
        
        # Start buffer thread
        source.buffer_thread = @task buffer_iq_samples!(source)
        schedule(source.buffer_thread)
        @info "Buffer started"
        
        return source
    end
end

# --- Helper Functions ---
function find_closest_gain(target::Float64)
    # Find the index of the closest gain in VALID_GAINS
    closest_index = 1
    min_diff = abs(VALID_GAINS[1] - target)
    for (i, gain) in enumerate(VALID_GAINS)
        diff = abs(gain - target)
        if diff < min_diff
            min_diff = diff
            closest_index = i
        end
    end
    return closest_index
end

function eth_connect!(source::KrakenSDRSource)
    try
        if !source.receiver_connection_status
            source.socket_inst = connect(source.ipAddr, source.port)
            write(source.socket_inst, "streaming")
            flush(source.socket_inst)
            sleep(1.0)

            source.ctr_iface_socket = connect(source.ipAddr, source.ctrlPort)
            source.receiver_connection_status = true
            ctr_iface_init!(source)
            sleep(0.5)  # Delay for KrakenSDR to process
            set_center_freq!(source, source.freq)
            sleep(0.5)
            set_if_gain!(source, source.gain)
            sleep(0.5)

            write(source.socket_inst, "IQDownload")
            flush(source.socket_inst)

            # Verify settings are applied
            @info "Waiting for KrakenSDR to apply settings..."
            deadline = time() + 10.0
            while time() < deadline
                test_frame = receive_iq_frame!(source)
                if !isnothing(test_frame)
                    expected_freq = Int(source.freq * 1e6)  # MHz → Hz
                    expected_gain1 = Int(VALID_GAINS[find_closest_gain(source.gain[1])] * 10)
                    expected_gain2 = Int(VALID_GAINS[find_closest_gain(source.gain[2])] * 10)
                    if source.iq_header.rf_center_freq == expected_freq &&
                       source.iq_header.if_gains[1] == expected_gain1 &&
                       source.iq_header.if_gains[2] == expected_gain2 &&
                       source.iq_header.sync_state == 1
                        @info "KrakenSDR settings applied and synchronized."
                        break
                    end
                end
                sleep(0.5)
            end
            if time() >= deadline
                @error "Timeout waiting for KrakenSDR to apply settings!"
                return -1
            end
            @info "Ethernet connection complete"
        end
    catch e
        @error "Ethernet Connection Failed" exception=(e, catch_backtrace())
        source.receiver_connection_status = false
        eth_close!(source)
        return -1
    end
    return 0
end

function ctr_iface_init!(source::KrakenSDRSource)
    if source.receiver_connection_status
        cmd = "INIT"
        msg_bytes = vcat([UInt8(c) for c in cmd], zeros(UInt8, 124))
        @threads for _ in 1:1  # Simulate thread
            ctr_iface_communication!(source, msg_bytes, cmd)
        end
    end
end

function ctr_iface_communication!(source::KrakenSDRSource, msg_bytes::Vector{UInt8}, cmd::String)
    lock(source.ctr_iface_thread_lock) do
        @info "Sending control message: $(cmd)"
        write(source.ctr_iface_socket, msg_bytes)
        flush(source.ctr_iface_socket)

        # Single buffer definition (no redundancy)
        reply_msg_bytes = Vector{UInt8}(undef, 128)
        bytes_read = 0
        deadline = time() + 5.0  # 5-second timeout (matches Python)

        while bytes_read < 128 && time() < deadline
            n = readbytes!(source.ctr_iface_socket, view(reply_msg_bytes, bytes_read+1:128))
            bytes_read += n
            if n == 0
                error("Connection closed while reading reply")
            end
            sleep(0.001)  # Prevent tight loop
        end

        if bytes_read == 0
            @warn "Timeout waiting for reply from KrakenSDR"
            return
        elseif bytes_read < 128
            @warn "Incomplete reply: only $bytes_read/128 bytes received"
        end

        @info "Control interface communication finished"
        status = String(reply_msg_bytes[1:min(4, bytes_read)])
        if status == "FNSD"
            @info "Reconfiguration successfully finished"
        else
            @warn "Failed to set requested parameter, reply: $status"
        end
    end
end


function set_center_freq!(source::KrakenSDRSource, center_freq::Union{Float64, Int})
    if source.receiver_connection_status
        # Convert Float64 (MHz) to Int (Hz)
        freq_hz = Int(center_freq * 1e6)  # e.g., 416.588 MHz → 416588000 Hz
        source.freq = freq_hz

        cmd = "FREQ"
        # Convert UInt64 to bytes (little-endian)
        freq_bytes = htol(UInt64(freq_hz))
        freq_byte_vector = reinterpret(UInt8, [freq_bytes])[:]  # 8 bytes

        msg_bytes = vcat(
            [UInt8(c) for c in cmd],  # "FREQ" (4 bytes)
            freq_byte_vector,          # 8 bytes (little-endian freq)
            zeros(UInt8, 116)        # Pad to 128 bytes total
        )
        @threads for _ in 1:1
            ctr_iface_communication!(source, msg_bytes, cmd)
        end
    end
end

function set_if_gain!(source::KrakenSDRSource, gain::Vector{Float64})
    if source.receiver_connection_status
        cmd = "GAIN"
        adjusted_gain = Vector{Float64}(undef, source.numChannels)
        for i in 1:source.numChannels
            if i <= length(gain)
                adjusted_gain[i] = gain[i]
            else
                adjusted_gain[i] = 0.0  # Default for unused channels
            end
        end
        # Find closest valid gains
        adjusted_gain = [VALID_GAINS[find_closest_gain(g)] for g in adjusted_gain]
        gain_list = [Int(g * 10) for g in adjusted_gain]  # Tenths of dB

        # Pack into 128-byte message
        gain_bytes = UInt8[]
        for g in gain_list
            append!(gain_bytes, reinterpret(UInt8, [htol(UInt32(g))])[:])
        end
        padding = 128 - (4 + length(gain_bytes))  # 4-byte command + 16-byte gains
        msg_bytes = vcat([UInt8(c) for c in cmd], gain_bytes, zeros(UInt8, padding))

        @threads for _ in 1:1
            ctr_iface_communication!(source, msg_bytes, cmd)
        end
    end
end

function get_iq_online!(source::KrakenSDRSource)
    if !source.receiver_connection_status
        @info "Connect before download"
        if eth_connect!(source) != 0
            return -1
        end
    end
    iq_samples = receive_iq_frame!(source)
    return iq_samples
end

function receive_iq_frame!(source::KrakenSDRSource)
    @info "Receive data..."
    # --- Read raw ComplexF32 samples (no header parsing) ---
    buffer = Vector{UInt8}(undef, source.receiverBufferSize)  # e.g., 262144 bytes
    n = readbytes!(source.socket_inst, buffer)
    if n == 0
        @warn "Connection closed"
        source.receiver_connection_status = false
        return nothing
    end

    # --- Ensure complete ComplexF32 samples (8 bytes each) ---
    n = n - (n % 8)  # Trim to multiple of 8
    if n == 0
        @warn "No complete samples received"
        return nothing
    end

    # --- Convert to ComplexF32 ---
    raw_samples = reinterpret(ComplexF32, buffer[1:n])

    # --- Reshape into channels × samples (interleaved: ch1, ch2, ch3, ch4, ch5, ch1, ch2, ...) ---
    num_channels = source.numChannels  # 5 for KrakenSDR
    num_samples = length(raw_samples)
    if num_samples % num_channels != 0
        @warn "Incomplete frame: $num_samples samples not divisible by $num_channels channels"
        return nothing
    end
    cpi_length = num_samples ÷ num_channels
    iq_samples = reshape(raw_samples, num_channels, cpi_length)

    # --- Manually set header fields (no real header) ---
    source.iq_header.sync_word = SYNC_WORD
    source.iq_header.frame_type = FRAME_TYPE_DATA
    source.iq_header.cpi_length = cpi_length
    source.iq_header.active_ant_chs = num_channels
    source.iq_header.rf_center_freq = Int(source.freq * 1e6)  # From your config
    source.iq_header.sample_bit_depth = 32  # ComplexF32 = 32 bits
    source.iq_header.sync_state = 1  # Assume synchronized
    source.iq_header.noise_source_state = 1  # Assume noise source enabled

    return iq_samples
end

function buffer_iq_samples!(source::KrakenSDRSource)
    while true
        if source.stop_threads
            return
        end
        iq_samples = receive_iq_frame!(source)
        if !isnothing(iq_samples)
            @info """Frame:
                Type: $(source.iq_header.frame_type)
                CPI Length: $(source.iq_header.cpi_length)
                Frequency: $(source.iq_header.rf_center_freq) Hz
                Gain: $(source.iq_header.if_gains[1:5] ./ 10) dB
                Sync State: $(source.iq_header.sync_state)"""
            if source.iq_header.frame_type == FRAME_TYPE_DATA && source.iq_header.cpi_length > 0
                try
                    put!(source.iq_sample_queue, iq_samples)
                catch e
                    @error "Failed to put IQ Samples into the Queue: $e"
                end
            end
        end
        sleep(0.001)
    end
end

function work!(source::KrakenSDRSource)
    return take!(source.iq_sample_queue)
end

function stop!(source::KrakenSDRSource)
    source.stop_threads = true
    wait(source.buffer_thread)
    eth_close!(source)
    return true
end

function eth_close!(source::KrakenSDRSource)
    try
        @info "Closing Ethernet connection"
        if source.receiver_connection_status
            write(source.socket_inst, "q")
            close(source.socket_inst)
            source.socket_inst = Sockets.TCPSocket()
            
            # Close control interface
            exit_message_bytes = vcat([UInt8(c) for c in "EXIT"], zeros(UInt8, 124))
            write(source.ctr_iface_socket, exit_message_bytes)
            close(source.ctr_iface_socket)
            source.ctr_iface_socket = Sockets.TCPSocket()
            
            source.receiver_connection_status = false
        end
    catch e
        println("Error closing connection: $e")
        return -1
    end
    return 0
end

# --- Example Usage ---
# source = KrakenSDRSource("127.0.0.1", 5000, 5001, 5, 416.588, [10.0], true)
# output_items = work!(source, 1024)
# stop!(source)

end # module