Shared memory delay_sync_iq access using C++

Greetings,
I’ve been digging heavily into KrakenSDR recently while developing a GNU Radio application in C++. Since gr-krakensdr is only written in Python, I had to rewrite the driver in C++ manually, which I kinda successfully did for eth output mode. However, I have some troubles when trying to access the shared memory interface.

I looked up how its done in krakensdr_doa and then wrote the shmemIface.py analog in C++ and it seems to be valid, but I’m getting an error within Heimdall DAQ hardware controller after the named pipes (fw_delay_sync_iq and bw_delay_sync_iq) connect. Here’s a verbose hwc.log:

INFO:__main__:Opening control interface server on ip address: - and port: 5001
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Processing sample size: 8192
INFO:__main__:Waiting for new connection
INFO:__main__:ADPIS state: 0
WARNING:__main__:Reference channel index is fixed 0 
INFO:__main__:Hardware Controller initialized
INFO:__main__:Conenction established 
WARNING:__main__:Got command INIT
INFO:__main__:Inititalization command received
Traceback (most recent call last):
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 704, in <module>
    HWC_inst0.start()
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 415, in start
    active_buff_index = self.in_shmem_iface.wait_buff_free()            
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 188, in wait_buff_free
    signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
struct.error: unpack requires a buffer of 1 bytes

I did some debugging of Heimdall DAQ and found out that in delay_sync.py the delay_sync_iq and delay_sync_hwc pipes are opened successfully. However, on attempt to send_ctr_buff_ready on delay_sync_iq I’m getting this error in delay_sync.log:

INFO:__main__:IQ adjustment vector: abs:[1. 1. 1. 1. 1.]
INFO:__main__:IQ adjustment vector: phase:[0. 0. 0. 0. 0.]
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Delay synchronizer initialized
INFO:__main__:open_interfaces
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
INFO:shmemIface:decimator_out
DEBUG:__main__:Type:0, CPI: 0, State:STATE_INIT
INFO:__main__:Delay track statistic [sync fails ,sample, iq, total][0,0,0/3]
INFO:shmemIface:delay_sync_iq
Traceback (most recent call last):
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 821, in <module>
    delay_synchronizer_inst0.start()
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 788, in start
    self.out_shmem_iface_iq.send_ctr_buff_ready(active_buffer_index_iq)
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 91, in send_ctr_buff_ready
    os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
BrokenPipeError: [Errno 32] Broken pipe
/home/user/miniforge3/envs/kraken/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 6 leaked shared_memory objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '

I thought that shmemIface might be outdated and copied the updated version from krakensdr_doa. I made it so it will retry for 5 seconds straight if it can’t read, but that didn’t help too: it first fails with [Errno 11] Resource temporarily unavailable and then crashes with the original error for both hwc and delay_sync scripts.

FYI_1: The original krakensdr_doa from the same source code is running absolutely fine. Since it uses shmem interface internally, there should be something wrong with my code? But why?
FYI_1: I’m only running the Hemdall DAQ software, i.e. there’s no KrakenSDR DOA in the background trying to use the same pipes.

Does anyone have any idea on what am I doing wrong with my code? Any help appreciated!

Please see the code below:

Usage

  // Initialize shared memory interface
  if (data_interface == "shmem") {
      std::string daq_shmem_control_path = std::filesystem::path("/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_data_control/").string();                                     
      shmem_iface = InShmemIface("delay_sync_iq", daq_shmem_control_path, 5.0);
      if (!shmem_iface.init_ok) {
          shmem_iface.destory_sm_buffer();
          throw std::runtime_error("Shared memory initialization failed");
      }
  }

InShmemIface.hpp


#include <chrono>
#include <fcntl.h>
#include <iostream>
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <thread>
#include <unistd.h>
#include <vector>

#define A_BUFF_READY 1
#define B_BUFF_READY 2
#define INIT_READY 10
#define TERMINATE 255
#define SLEEP_TIME_BETWEEN_READ_ATTEMPTS 0.01

class InShmemIface {
private:
    double num_read_attempts;
    std::string shmem_name;
    int fw_ctr_fifo;
    int bw_ctr_fifo;
    int shmem_A;
    int shmem_B;
    size_t shmem_size;

public:
    bool init_ok;
    uint8_t* buffer_A;
    uint8_t* buffer_B;

    InShmemIface() {
        init_ok = false;
        num_read_attempts = 0.0;
        shmem_name = "";
        fw_ctr_fifo = -1;
        bw_ctr_fifo = -1;
        shmem_A = -1;
        shmem_B = -1;
        shmem_size = 0;
        buffer_A = nullptr;
        buffer_B = nullptr;
    }

    InShmemIface(const std::string& shmem_name, const std::string& ctr_fifo_path = "_data_control/", double read_timeout = 0.0) {
        init_ok = true;
        num_read_attempts = read_timeout / SLEEP_TIME_BETWEEN_READ_ATTEMPTS;
        this->shmem_name = shmem_name;
        fw_ctr_fifo = open((ctr_fifo_path + "fw_" + shmem_name).c_str(), O_RDONLY | O_NONBLOCK);
        bw_ctr_fifo = open((ctr_fifo_path + "bw_" + shmem_name).c_str(), O_WRONLY);

        if (fw_ctr_fifo == -1 || bw_ctr_fifo == -1) {
            std::cerr << "Failed to open control fifos: " << strerror(errno) << std::endl;
            init_ok = false;
        }

        if (fw_ctr_fifo != -1) {
            int signal = read_fw_ctr_fifo();
            if (signal == INIT_READY) {
                shmem_A = shm_open((shmem_name + "_A").c_str(), O_RDONLY, 0);
                shmem_B = shm_open((shmem_name + "_B").c_str(), O_RDONLY, 0);

                if (shmem_A == -1 || shmem_B == -1) {
                    std::cerr << "Failed to open shared memories: " << strerror(errno) << std::endl;
                    init_ok = false;
                }

                if (init_ok) {
                    struct stat statbuf;
                    fstat(shmem_A, &statbuf);
                    size_t shmem_size = statbuf.st_size;

                    buffer_A = static_cast<uint8_t*>(mmap(NULL, shmem_size, PROT_READ, MAP_SHARED, shmem_A, 0));
                    buffer_B = static_cast<uint8_t*>(mmap(NULL, shmem_size, PROT_READ, MAP_SHARED, shmem_B, 0));
                }
            } else {
                init_ok = false;
            }
        }
    }

    ~InShmemIface() {
        destory_sm_buffer();
    }

    void send_ctr_buff_ready(int active_buffer_index) {
        if (active_buffer_index == 0) {
            int signal = A_BUFF_READY;
            ssize_t _ = write(bw_ctr_fifo, &signal, sizeof(signal));
        } else if (active_buffer_index == 1) {
            int signal = B_BUFF_READY;
            ssize_t _ = write(bw_ctr_fifo, &signal, sizeof(signal));
        }
    }

    void destory_sm_buffer() {
        if (buffer_A != nullptr) {
            munmap(buffer_A, shmem_size);
        }

        if (buffer_B != nullptr) {
            munmap(buffer_B, shmem_size);
        }

        if (shmem_A != -1) {
            close(shmem_A);
        }

        if (shmem_B != -1) {
            close(shmem_B);
        }

        if (fw_ctr_fifo != -1) {
            close(fw_ctr_fifo);
        }

        if (bw_ctr_fifo != -1) {
            close(bw_ctr_fifo);
        }
    }

    int wait_buff_free() {
        int signal = read_fw_ctr_fifo();
        if (signal == A_BUFF_READY) {
            return 0;
        } else if (signal == B_BUFF_READY) {
            return 1;
        } else if (signal == TERMINATE) {
            return TERMINATE;
        } else {
            return -1;
        }
    }

    int read_fw_ctr_fifo() {
        for (int i = 0; i < num_read_attempts; i++) {
            uint8_t signal;
            ssize_t bytes_read = read(fw_ctr_fifo, &signal, sizeof(signal));
            if (bytes_read == -1) {
                std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(SLEEP_TIME_BETWEEN_READ_ATTEMPTS * 1000)));
            } else {
                return signal;
            }
        }
        return -1;
    }
};

UPD:
By adding and running this minimal example after starting Heimdall DAQ only, the program also crashes:
Firmware/_daq_core/shmemIface.py

if __name__ == '__main__':
    shmem = inShmemIface('delay_sync_iq')
    print(shmem.wait_buff_free())
sudo python shmemIface.py 

0
/usr/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked shared_memory objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '...

I.e. there’s a ready buffer, but a crash still occurs. I’ve added debug messages to send_ctr_buff_ready and wait_buff_free in both in and out shmem interface classes. Here are the logs:
delay_sync.log

INFO:__main__:IQ adjustment vector: abs:[1. 1. 1. 1. 1.]
INFO:__main__:IQ adjustment vector: phase:[0. 0. 0. 0. 0.]
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Delay synchronizer initialized
INFO:__main__:open_interfaces
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
WARNING:shmemIface:Shared memory not exist
INFO:shmemIface:IN: Waiting for buffer free: 
INFO:shmemIface:decimator_out
INFO:shmemIface:OUT: Waiting for buffer free: 
INFO:shmemIface:delay_sync_iq
INFO:shmemIface:OUT: Waiting for buffer free: 
INFO:shmemIface:delay_sync_hwc
DEBUG:__main__:Type:0, CPI: 0, State:STATE_INIT
INFO:__main__:Delay track statistic [sync fails ,sample, iq, total][0,0,0/3]
INFO:shmemIface:OUT: Sending buffer ready: 
INFO:shmemIface:delay_sync_iq
INFO:shmemIface:OUT: Sending buffer ready: 
INFO:shmemIface:delay_sync_hwc
INFO:shmemIface:IN: Sending buffer ready: 
INFO:shmemIface:decimator_out
INFO:shmemIface:IN: Waiting for buffer free: 
INFO:shmemIface:decimator_out
INFO:shmemIface:OUT: Waiting for buffer free: 
INFO:shmemIface:delay_sync_iq
INFO:shmemIface:OUT: Waiting for buffer free: 
INFO:shmemIface:delay_sync_hwc
DEBUG:__main__:Type:1, CPI: 1, State:STATE_INIT
INFO:shmemIface:OUT: Sending buffer ready: 
INFO:shmemIface:delay_sync_iq
Traceback (most recent call last):
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 821, in <module>
    delay_synchronizer_inst0.start()
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/delay_sync.py", line 788, in start
    self.out_shmem_iface_iq.send_ctr_buff_ready(active_buffer_index_iq)
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 94, in send_ctr_buff_ready
    os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY))
BrokenPipeError: [Errno 32] Broken pipe
/home/user/miniforge3/envs/kraken/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 6 leaked shared_memory objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
/home/user/miniforge3/envs/kraken/lib/python3.9/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/delay_sync_iq_A': [Errno 2] No such file or directory: '/delay_sync_iq_A'
  warnings.warn('resource_tracker: %r: %s' % (name, e))
/home/user/miniforge3/envs/kraken/lib/python3.9/multiprocessing/resource_tracker.py:229: UserWarning: resource_tracker: '/delay_sync_iq_B': [Errno 2] No such file or directory: '/delay_sync_iq_B'
  warnings.warn('resource_tracker: %r: %s' % (name, e))

hwc.log

INFO:__main__:Opening control interface server on ip address: - and port: 5001
INFO:__main__:Antenna channles 5
INFO:__main__:IQ samples per channel 1048576
INFO:__main__:Processing sample size: 8192
INFO:__main__:ADPIS state: 0
WARNING:__main__:Reference channel index is fixed 0 
INFO:__main__:Waiting for new connection
INFO:__main__:Hardware Controller initialized
INFO:shmemIface:IN: Waiting for buffer free: 
INFO:shmemIface:delay_sync_hwc
DEBUG:__main__:Type:0, CPI: 0, State:STATE_INIT
DEBUG:__main__:Channel 0 power:0.00 dB, gain:496 [0]
DEBUG:__main__:Channel 1 power:0.00 dB, gain:496 [0]
DEBUG:__main__:Channel 2 power:0.00 dB, gain:496 [0]
DEBUG:__main__:Channel 3 power:0.00 dB, gain:496 [0]
DEBUG:__main__:Channel 4 power:0.00 dB, gain:496 [0]
INFO:__main__:Send Ch 0 Gain: 0 [0]
INFO:__main__:Send Ch 1 Gain: 0 [0]
INFO:__main__:Send Ch 2 Gain: 0 [0]
INFO:__main__:Send Ch 3 Gain: 0 [0]
INFO:__main__:Send Ch 4 Gain: 0 [0]
DEBUG:__main__:Received reply: b'ok'
DEBUG:__main__:Received reply: b'ok'
INFO:shmemIface:IN: Sending buffer ready: 
INFO:shmemIface:delay_sync_hwc
INFO:shmemIface:IN: Waiting for buffer free: 
INFO:shmemIface:delay_sync_hwc
Traceback (most recent call last):
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 704, in <module>
    HWC_inst0.start()
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/hw_controller.py", line 415, in start
    active_buff_index = self.in_shmem_iface.wait_buff_free()            
  File "/home/user/krakensdr_doa/heimdall_daq_fw/Firmware/_daq_core/shmemIface.py", line 196, in wait_buff_free
    signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
struct.error: unpack requires a buffer of 1 bytes

Hi, unfortunately, this might be a bit difficult for me to answer as I did not write the shmem code and I’m not exactly what the problem could be.

I’ll ask the developer to take a look at your results, but it might be a while before he can get to you.

BTW, in my opinion the shared memory interface is not important to implement if you already have the socket interface working. The shared memory interface was created as an alternative to the socket interface solely to improve performance on low-powered hardware like the Pi 4 which struggled with some slowdowns from the high data transfer rate through sockets.

But if you running on anything more powerful the speed benefits of shared memory are probably not going to noticeable when compared to sockets.