Greetings,
I’ve been digging heavily into KrakenSDR recently while developing a GNU Radio application in C++. Since gr-krakensdr is only written in Python, I had to rewrite the driver in C++ manually, which I kinda successfully did for eth output mode. However, I have some troubles when trying to access the shared memory interface.
I looked up how its done in krakensdr_doa and then wrote the shmemIface.py analog in C++ and it seems to be valid, but I’m getting an error within Heimdall DAQ hardware controller after the named pipes (fw_delay_sync_iq and bw_delay_sync_iq) connect. Here’s a verbose hwc.log:
INFO:__main__:Opening control interface server on ip address: - and port: 5001
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Processing sample size: 8192
INFO:__main__:Waiting for new connection
INFO:__main__:ADPIS state: 0
WARNING:__main__:Reference channel index is fixed 0
INFO:__main__:Hardware Controller initialized
INFO:__main__:Conenction established
WARNING:__main__:Got command INIT
INFO:__main__:Inititalization command received
Traceback (most recent call last):
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 704, in <module>
HWC_inst0.start()
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 415, in start
active_buff_index = self.in_shmem_iface.wait_buff_free()
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 188, in wait_buff_free
signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
struct.error: unpack requires a buffer of 1 bytes
I did some debugging of Heimdall DAQ and found out that in delay_sync.py the delay_sync_iq and delay_sync_hwc pipes are opened successfully. However, on attempt to send_ctr_buff_ready
on delay_sync_iq I’m getting this error in delay_sync.log:
INFO:__main__:IQ adjustment vector: abs:[1. 1. 1. 1. 1.]
INFO:__main__:IQ adjustment vector: phase:[0. 0. 0. 0. 0.]
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Delay synchronizer initialized
INFO:__main__:open_interfaces
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
INFO:shmemIface:decimator_out
DEBUG:__main__:Type:0, CPI: 0, State:STATE_INIT
INFO:__main__:Delay track statistic [sync fails ,sample, iq, total][0,0,0/3]
INFO:shmemIface:delay_sync_iq
Traceback (most recent call last):
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 821, in <module>
delay_synchronizer_inst0.start()
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 788, in start
self.out_shmem_iface_iq.send_ctr_buff_ready(active_buffer_index_iq)
File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 91, in send_ctr_buff_ready
os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
BrokenPipeError: [Errno 32] Broken pipe
/home/user/miniforge3/envs/kraken/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 6 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
I thought that shmemIface might be outdated and copied the updated version from krakensdr_doa. I made it so it will retry for 5 seconds straight if it can’t read, but that didn’t help too: it first fails with [Errno 11] Resource temporarily unavailable
and then crashes with the original error for both hwc and delay_sync scripts.
FYI_1: The original krakensdr_doa from the same source code is running absolutely fine. Since it uses shmem interface internally, there should be something wrong with my code? But why?
FYI_1: I’m only running the Hemdall DAQ software, i.e. there’s no KrakenSDR DOA in the background trying to use the same pipes.
Does anyone have any idea on what am I doing wrong with my code? Any help appreciated!
Please see the code below:
Usage
// Initialize shared memory interface
if (data_interface == "shmem") {
std::string daq_shmem_control_path = std::filesystem::path("/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_data_control/").string();
shmem_iface = InShmemIface("delay_sync_iq", daq_shmem_control_path, 5.0);
if (!shmem_iface.init_ok) {
shmem_iface.destory_sm_buffer();
throw std::runtime_error("Shared memory initialization failed");
}
}
InShmemIface.hpp
#include <chrono>
#include <fcntl.h>
#include <iostream>
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <vector>
#define A_BUFF_READY 1
#define B_BUFF_READY 2
#define INIT_READY 10
#define TERMINATE 255
#define SLEEP_TIME_BETWEEN_READ_ATTEMPTS 0.01
class InShmemIface {
private:
double num_read_attempts;
std::string shmem_name;
int fw_ctr_fifo;
int bw_ctr_fifo;
int shmem_A;
int shmem_B;
size_t shmem_size;
public:
bool init_ok;
uint8_t* buffer_A;
uint8_t* buffer_B;
InShmemIface() {
init_ok = false;
num_read_attempts = 0.0;
shmem_name = "";
fw_ctr_fifo = -1;
bw_ctr_fifo = -1;
shmem_A = -1;
shmem_B = -1;
shmem_size = 0;
buffer_A = nullptr;
buffer_B = nullptr;
}
InShmemIface(const std::string& shmem_name, const std::string& ctr_fifo_path = "_data_control/", double read_timeout = 0.0) {
init_ok = true;
num_read_attempts = read_timeout / SLEEP_TIME_BETWEEN_READ_ATTEMPTS;
this->shmem_name = shmem_name;
fw_ctr_fifo = open((ctr_fifo_path + "fw_" + shmem_name).c_str(), O_RDONLY | O_NONBLOCK);
bw_ctr_fifo = open((ctr_fifo_path + "bw_" + shmem_name).c_str(), O_WRONLY);
if (fw_ctr_fifo == -1 || bw_ctr_fifo == -1) {
std::cerr << "Failed to open control fifos: " << strerror(errno) << std::endl;
init_ok = false;
}
if (fw_ctr_fifo != -1) {
int signal = read_fw_ctr_fifo();
if (signal == INIT_READY) {
shmem_A = shm_open((shmem_name + "_A").c_str(), O_RDONLY, 0);
shmem_B = shm_open((shmem_name + "_B").c_str(), O_RDONLY, 0);
if (shmem_A == -1 || shmem_B == -1) {
std::cerr << "Failed to open shared memories: " << strerror(errno) << std::endl;
init_ok = false;
}
if (init_ok) {
struct stat statbuf;
fstat(shmem_A, &statbuf);
size_t shmem_size = statbuf.st_size;
buffer_A = static_cast<uint8_t*>(mmap(NULL, shmem_size, PROT_READ, MAP_SHARED, shmem_A, 0));
buffer_B = static_cast<uint8_t*>(mmap(NULL, shmem_size, PROT_READ, MAP_SHARED, shmem_B, 0));
}
} else {
init_ok = false;
}
}
}
~InShmemIface() {
destory_sm_buffer();
}
void send_ctr_buff_ready(int active_buffer_index) {
if (active_buffer_index == 0) {
int signal = A_BUFF_READY;
ssize_t _ = write(bw_ctr_fifo, &signal, sizeof(signal));
} else if (active_buffer_index == 1) {
int signal = B_BUFF_READY;
ssize_t _ = write(bw_ctr_fifo, &signal, sizeof(signal));
}
}
void destory_sm_buffer() {
if (buffer_A != nullptr) {
munmap(buffer_A, shmem_size);
}
if (buffer_B != nullptr) {
munmap(buffer_B, shmem_size);
}
if (shmem_A != -1) {
close(shmem_A);
}
if (shmem_B != -1) {
close(shmem_B);
}
if (fw_ctr_fifo != -1) {
close(fw_ctr_fifo);
}
if (bw_ctr_fifo != -1) {
close(bw_ctr_fifo);
}
}
int wait_buff_free() {
int signal = read_fw_ctr_fifo();
if (signal == A_BUFF_READY) {
return 0;
} else if (signal == B_BUFF_READY) {
return 1;
} else if (signal == TERMINATE) {
return TERMINATE;
} else {
return -1;
}
}
int read_fw_ctr_fifo() {
for (int i = 0; i < num_read_attempts; i++) {
uint8_t signal;
ssize_t bytes_read = read(fw_ctr_fifo, &signal, sizeof(signal));
if (bytes_read == -1) {
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(SLEEP_TIME_BETWEEN_READ_ATTEMPTS * 1000)));
} else {
return signal;
}
}
return -1;
}
};