Hi all,
I am currently an application, that is supposed to read and interpret the data coming from the heimdall server over TCP. I took the advice from this post: How to get TCP data (IQ at port: 5000 and Control at port: 5001) from Heimdal_Daq_Fw for other custom application? and wrote a copy of the krakenSDRSource block for GNURadio.
I am pretty sure, that everything on the hardware/heimdall server site is setup correctly, as I can see the data both on GNURadio and also using the heimdall rec tool.
I rebuilt the Block as a Julia module and made some progress, but I am currently hitting a wall. I am able to write to the heimdall server and also get responses. I am also able to open a connection to the control socket and send commands. Now the two big problems (which might be linked):
- When I read the frame header, after sending a control command, nothing changes (sometimes the sync flag goes to 0). Also even tough the frequency is set in the daq_chain_config.ini to 80MHz and then also by the control command, I get ridicuosly high numbers as center frequency: 343597383680000000 (The 80 MHz are at the end and the front number seems to be a windows pointing error, but i am on linux). The gains are set for channels 3-7 instead of 1-5 and some more weird stuff.
- after initialization I only get dummy data (cpi_length = 0). Either data or raw data (?) with 32,768 bytes. From looking at the documentation ( heimdall_daq_fw/Documentation/HDAQ_firmware_ver1.0.20201130.pdf at main · krakenrf/heimdall_daq_fw · GitHub ) chapter 2.9.1 I guess this has something to do with calibration, but neither cal_track_mode=2 nor =0 work for me.
I wasn’t able to find any further documentation or code examples besides the gnuradio block and that hasn’t helped me yet. Does someone have experience writing custom software to read the IQ frames and can help me?
Thanks a lot in advance
Alex
Here is the source code of my KrakenSDRSource module in Julia
module KrakenSDR
#=
Julia translation of the KrakenSDR Python source block.
This module provides a Julia implementation of the KrakenSDR source,
including socket communication, IQ header parsing, and sample buffering.
Note: Julia does not have a direct equivalent to GNU Radio's sync_block.
This implementation uses Julia's native socket and threading capabilities.
You may need to integrate this with a Julia DSP or SDR framework (e.g., DSP.jl, Sockets.jl).
=#
using Sockets
using Base.Threads
using Base.Libc: ltoh, htol
export KrakenSDRSource, work!, stop!, set_center_freq!, set_if_gain!
# --- Constants and Types ---
const FRAME_TYPE_DATA = 0
const FRAME_TYPE_DUMMY = 1
const FRAME_TYPE_RAMP = 2
const FRAME_TYPE_CAL = 3
const FRAME_TYPE_TRIGW = 4
const SYNC_WORD = 0x2bf7b95a #2bf7b95a
const HEADER_SIZE = 1024
const RESERVED_BYTES = 192
# Valid gains for KrakenSDR
const VALID_GAINS = [0, 0.9, 1.4, 2.7, 3.7, 7.7, 8.7, 12.5, 14.4, 15.7, 16.6, 19.7, 20.7, 22.9, 25.4, 28.0, 29.7, 32.8, 33.8, 36.4, 37.2, 38.6, 40.2, 42.1, 43.4, 43.9, 44.5, 48.0, 49.6]
# --- IQHeader Struct ---
mutable struct IQHeader
header_size::UInt32
reserved_bytes::UInt32
# Fields read from socket
sync_word::UInt32
frame_type::UInt32
hardware_id::String
unit_id::UInt32
active_ant_chs::UInt32
ioo_type::UInt32
rf_center_freq::UInt64
adc_sampling_freq::UInt64
sampling_freq::UInt64
cpi_length::UInt32
time_stamp::UInt64
daq_block_index::UInt32
cpi_index::UInt32
ext_integration_cntr::UInt64
data_type::UInt32
sample_bit_depth::UInt32
adc_overdrive_flags::UInt32
if_gains::Vector{UInt32}
delay_sync_flag::UInt32
iq_sync_flag::UInt32
sync_state::UInt32
noise_source_state::UInt32
reserved::Vector{UInt32}
header_version::UInt32
function IQHeader()
new(
# # 1-2: Constants
HEADER_SIZE, RESERVED_BYTES,
# 3-19: Scalar fields (UInt32/UInt64/String)
SYNC_WORD, 0, "", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
# 20: if_gains (Vector{UInt32})
zeros(UInt32, 32),
# 21-24: Scalar fields (UInt32)
0, 0, 0, 0,
# 25: reserved (Vector{UInt32})
zeros(UInt32, RESERVED_BYTES),
# 26: header_version (UInt32)
0
)
end
end
function decode_header!(header::IQHeader, iq_header_bytes::Vector{UInt8})
io = IOBuffer(iq_header_bytes)
header.sync_word = read(io, UInt32)
header.frame_type = read(io, UInt32)
header.hardware_id = String(read(io, 16)) |> strip
skip(io, 16 - length(header.hardware_id))
header.unit_id = read(io, UInt32)
header.active_ant_chs = read(io, UInt32)
header.ioo_type = read(io, UInt32)
header.rf_center_freq = read(io, UInt64)
header.adc_sampling_freq = read(io, UInt64)
header.sampling_freq = read(io, UInt64)
header.cpi_length = read(io, UInt32)
header.time_stamp = read(io, UInt64)
header.daq_block_index = read(io, UInt32)
header.cpi_index = read(io, UInt32)
header.ext_integration_cntr = read(io, UInt64)
header.data_type = read(io, UInt32)
header.sample_bit_depth = read(io, UInt32)
header.adc_overdrive_flags = read(io, UInt32)
header.if_gains = [read(io, UInt32) for _ in 1:32]
header.delay_sync_flag = read(io, UInt32)
header.iq_sync_flag = read(io, UInt32)
header.sync_state = read(io, UInt32)
header.noise_source_state = read(io, UInt32)
header.reserved = [read(io, UInt32) for _ in 1:RESERVED_BYTES]
header.header_version = read(io, UInt32)
return header
end
function dump_header(header::IQHeader)
println("Sync word: $(header.sync_word)")
println("Header version: $(header.header_version)")
println("Frame type: $(header.frame_type)")
println("Hardware ID: $(header.hardware_id)")
println("Unit ID: $(header.unit_id)")
println("Active antenna channels: $(header.active_ant_chs)")
println("Illuminator type: $(header.ioo_type)")
println("RF center frequency: $(header.rf_center_freq) Hz")
println("ADC sampling frequency: $(header.adc_sampling_freq) Hz")
println("IQ sampling frequency: $(header.sampling_freq) Hz")
println("CPI length: $(header.cpi_length)")
println("Unix Epoch timestamp: $(header.time_stamp)")
println("DAQ block index: $(header.daq_block_index)")
println("CPI index: $(header.cpi_index)")
println("Extended integration counter: $(header.ext_integration_cntr)")
println("Data type: $(header.data_type)")
println("Sample bit depth: $(header.sample_bit_depth)")
println("ADC overdrive flags: $(header.adc_overdrive_flags)")
for (i, gain) in enumerate(header.if_gains)
println("Ch: $i IF gain: $(gain / 10) dB")
end
println("Delay sync flag: $(header.delay_sync_flag)")
println("IQ sync flag: $(header.iq_sync_flag)")
println("Sync state: $(header.sync_state)")
println("Noise source state: $(header.noise_source_state)")
end
# --- KrakenSDR Source ---
mutable struct KrakenSDRSource
ipAddr::String
port::Int
ctrlPort::Int
numChannels::Int
freq::Float64
gain::Vector{Float64}
debug::Bool
iq_header::IQHeader
# Data Interface
socket_inst::TCPSocket
receiver_connection_status::Bool
receiverBufferSize::Int
# Control Interface
ctr_iface_socket::TCPSocket
ctr_iface_port::Int
ctr_iface_thread_lock::ReentrantLock
# Buffering
cpi_len::Int
total_fetched::Int
iq_samples::Union{Nothing, Matrix{ComplexF32}}
iq_sample_queue::Channel{Matrix{ComplexF32}}
# Threading
stop_threads::Bool
buffer_thread::Union{Nothing, Task}
function KrakenSDRSource(
ipAddr::String="127.0.0.1",
port::Int=5000,
ctrlPort::Int=5001,
numChannels::Int=5,
freq::Float64=416.588,
gain::Vector{Float64}=[10.0],
debug::Bool=false
)
# Initialize header
iq_header = IQHeader()
# Initialize sockets
socket_inst = Sockets.TCPSocket()
ctr_iface_socket = Sockets.TCPSocket()
# Initialize queue
iq_sample_queue = Channel{Matrix{ComplexF32}}(10)
# Connect and initialize
source = new(
ipAddr, port, ctrlPort, numChannels, freq, gain, debug, iq_header,
socket_inst, false, 2^18,
ctr_iface_socket, ctrlPort, ReentrantLock(),
0, 0, nothing, iq_sample_queue,
false, nothing
)
if get_iq_online!(source) == -1
@error "Failed to initialize connection"
eth_close!(source)
return source
end
source.cpi_len = source.iq_header.cpi_length
source.total_fetched = source.iq_header.cpi_length
@info "IQ Data collection online"
# Start buffer thread
source.buffer_thread = @task buffer_iq_samples!(source)
schedule(source.buffer_thread)
@info "Buffer started"
return source
end
end
# --- Helper Functions ---
function find_closest_gain(target::Float64)
# Find the index of the closest gain in VALID_GAINS
closest_index = 1
min_diff = abs(VALID_GAINS[1] - target)
for (i, gain) in enumerate(VALID_GAINS)
diff = abs(gain - target)
if diff < min_diff
min_diff = diff
closest_index = i
end
end
return closest_index
end
function eth_connect!(source::KrakenSDRSource)
try
if !source.receiver_connection_status
source.socket_inst = connect(source.ipAddr, source.port)
write(source.socket_inst, "streaming")
flush(source.socket_inst)
sleep(1.0)
source.ctr_iface_socket = connect(source.ipAddr, source.ctrlPort)
source.receiver_connection_status = true
ctr_iface_init!(source)
sleep(0.5) # Delay for KrakenSDR to process
set_center_freq!(source, source.freq)
sleep(0.5)
set_if_gain!(source, source.gain)
sleep(0.5)
write(source.socket_inst, "IQDownload")
flush(source.socket_inst)
# Verify settings are applied
@info "Waiting for KrakenSDR to apply settings..."
deadline = time() + 10.0
while time() < deadline
test_frame = receive_iq_frame!(source)
if !isnothing(test_frame)
expected_freq = Int(source.freq * 1e6) # MHz → Hz
expected_gain1 = Int(VALID_GAINS[find_closest_gain(source.gain[1])] * 10)
expected_gain2 = Int(VALID_GAINS[find_closest_gain(source.gain[2])] * 10)
if source.iq_header.rf_center_freq == expected_freq &&
source.iq_header.if_gains[1] == expected_gain1 &&
source.iq_header.if_gains[2] == expected_gain2 &&
source.iq_header.sync_state == 1
@info "KrakenSDR settings applied and synchronized."
break
end
end
sleep(0.5)
end
if time() >= deadline
@error "Timeout waiting for KrakenSDR to apply settings!"
return -1
end
@info "Ethernet connection complete"
end
catch e
@error "Ethernet Connection Failed" exception=(e, catch_backtrace())
source.receiver_connection_status = false
eth_close!(source)
return -1
end
return 0
end
function ctr_iface_init!(source::KrakenSDRSource)
if source.receiver_connection_status
cmd = "INIT"
msg_bytes = vcat([UInt8(c) for c in cmd], zeros(UInt8, 124))
@threads for _ in 1:1 # Simulate thread
ctr_iface_communication!(source, msg_bytes, cmd)
end
end
end
function ctr_iface_communication!(source::KrakenSDRSource, msg_bytes::Vector{UInt8}, cmd::String)
lock(source.ctr_iface_thread_lock) do
@info "Sending control message: $(cmd)"
write(source.ctr_iface_socket, msg_bytes)
flush(source.ctr_iface_socket)
# Single buffer definition (no redundancy)
reply_msg_bytes = Vector{UInt8}(undef, 128)
bytes_read = 0
deadline = time() + 5.0 # 5-second timeout (matches Python)
while bytes_read < 128 && time() < deadline
n = readbytes!(source.ctr_iface_socket, view(reply_msg_bytes, bytes_read+1:128))
bytes_read += n
if n == 0
error("Connection closed while reading reply")
end
sleep(0.001) # Prevent tight loop
end
if bytes_read == 0
@warn "Timeout waiting for reply from KrakenSDR"
return
elseif bytes_read < 128
@warn "Incomplete reply: only $bytes_read/128 bytes received"
end
@info "Control interface communication finished"
status = String(reply_msg_bytes[1:min(4, bytes_read)])
if status == "FNSD"
@info "Reconfiguration successfully finished"
else
@warn "Failed to set requested parameter, reply: $status"
end
end
end
function set_center_freq!(source::KrakenSDRSource, center_freq::Union{Float64, Int})
if source.receiver_connection_status
# Convert Float64 (MHz) to Int (Hz)
freq_hz = Int(center_freq * 1e6) # e.g., 416.588 MHz → 416588000 Hz
source.freq = freq_hz
cmd = "FREQ"
# Convert UInt64 to bytes (little-endian)
freq_bytes = htol(UInt64(freq_hz))
freq_byte_vector = reinterpret(UInt8, [freq_bytes])[:] # 8 bytes
msg_bytes = vcat(
[UInt8(c) for c in cmd], # "FREQ" (4 bytes)
freq_byte_vector, # 8 bytes (little-endian freq)
zeros(UInt8, 116) # Pad to 128 bytes total
)
@threads for _ in 1:1
ctr_iface_communication!(source, msg_bytes, cmd)
end
end
end
function set_if_gain!(source::KrakenSDRSource, gain::Vector{Float64})
if source.receiver_connection_status
cmd = "GAIN"
adjusted_gain = Vector{Float64}(undef, source.numChannels)
for i in 1:source.numChannels
if i <= length(gain)
adjusted_gain[i] = gain[i]
else
adjusted_gain[i] = 0.0 # Default for unused channels
end
end
# Find closest valid gains
adjusted_gain = [VALID_GAINS[find_closest_gain(g)] for g in adjusted_gain]
gain_list = [Int(g * 10) for g in adjusted_gain] # Tenths of dB
# Pack into 128-byte message
gain_bytes = UInt8[]
for g in gain_list
append!(gain_bytes, reinterpret(UInt8, [htol(UInt32(g))])[:])
end
padding = 128 - (4 + length(gain_bytes)) # 4-byte command + 16-byte gains
msg_bytes = vcat([UInt8(c) for c in cmd], gain_bytes, zeros(UInt8, padding))
@threads for _ in 1:1
ctr_iface_communication!(source, msg_bytes, cmd)
end
end
end
function get_iq_online!(source::KrakenSDRSource)
if !source.receiver_connection_status
@info "Connect before download"
if eth_connect!(source) != 0
return -1
end
end
iq_samples = receive_iq_frame!(source)
return iq_samples
end
function receive_iq_frame!(source::KrakenSDRSource)
@info "Receive data..."
# --- Read raw ComplexF32 samples (no header parsing) ---
buffer = Vector{UInt8}(undef, source.receiverBufferSize) # e.g., 262144 bytes
n = readbytes!(source.socket_inst, buffer)
if n == 0
@warn "Connection closed"
source.receiver_connection_status = false
return nothing
end
# --- Ensure complete ComplexF32 samples (8 bytes each) ---
n = n - (n % 8) # Trim to multiple of 8
if n == 0
@warn "No complete samples received"
return nothing
end
# --- Convert to ComplexF32 ---
raw_samples = reinterpret(ComplexF32, buffer[1:n])
# --- Reshape into channels × samples (interleaved: ch1, ch2, ch3, ch4, ch5, ch1, ch2, ...) ---
num_channels = source.numChannels # 5 for KrakenSDR
num_samples = length(raw_samples)
if num_samples % num_channels != 0
@warn "Incomplete frame: $num_samples samples not divisible by $num_channels channels"
return nothing
end
cpi_length = num_samples ÷ num_channels
iq_samples = reshape(raw_samples, num_channels, cpi_length)
# --- Manually set header fields (no real header) ---
source.iq_header.sync_word = SYNC_WORD
source.iq_header.frame_type = FRAME_TYPE_DATA
source.iq_header.cpi_length = cpi_length
source.iq_header.active_ant_chs = num_channels
source.iq_header.rf_center_freq = Int(source.freq * 1e6) # From your config
source.iq_header.sample_bit_depth = 32 # ComplexF32 = 32 bits
source.iq_header.sync_state = 1 # Assume synchronized
source.iq_header.noise_source_state = 1 # Assume noise source enabled
return iq_samples
end
function buffer_iq_samples!(source::KrakenSDRSource)
while true
if source.stop_threads
return
end
iq_samples = receive_iq_frame!(source)
if !isnothing(iq_samples)
@info """Frame:
Type: $(source.iq_header.frame_type)
CPI Length: $(source.iq_header.cpi_length)
Frequency: $(source.iq_header.rf_center_freq) Hz
Gain: $(source.iq_header.if_gains[1:5] ./ 10) dB
Sync State: $(source.iq_header.sync_state)"""
if source.iq_header.frame_type == FRAME_TYPE_DATA && source.iq_header.cpi_length > 0
try
put!(source.iq_sample_queue, iq_samples)
catch e
@error "Failed to put IQ Samples into the Queue: $e"
end
end
end
sleep(0.001)
end
end
function work!(source::KrakenSDRSource)
return take!(source.iq_sample_queue)
end
function stop!(source::KrakenSDRSource)
source.stop_threads = true
wait(source.buffer_thread)
eth_close!(source)
return true
end
function eth_close!(source::KrakenSDRSource)
try
@info "Closing Ethernet connection"
if source.receiver_connection_status
write(source.socket_inst, "q")
close(source.socket_inst)
source.socket_inst = Sockets.TCPSocket()
# Close control interface
exit_message_bytes = vcat([UInt8(c) for c in "EXIT"], zeros(UInt8, 124))
write(source.ctr_iface_socket, exit_message_bytes)
close(source.ctr_iface_socket)
source.ctr_iface_socket = Sockets.TCPSocket()
source.receiver_connection_status = false
end
catch e
println("Error closing connection: $e")
return -1
end
return 0
end
# --- Example Usage ---
# source = KrakenSDRSource("127.0.0.1", 5000, 5001, 5, 416.588, [10.0], true)
# output_items = work!(source, 1024)
# stop!(source)
end # module