diff --git a/CMakeLists.txt b/CMakeLists.txt index 74f54c6f..7e301976 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ # Based on what I learnt from https://cliutils.gitlab.io/modern-cmake -cmake_minimum_required(VERSION 3.14...3.19) +cmake_minimum_required(VERSION 3.28) # Create a release output by default set(default_build_type "Release") @@ -33,8 +33,14 @@ set(UPM_CLI_VERSION 0.9) message("Building udpPacketManager Version ${PROJECT_VERSION}\n\n") -set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -march=native -O2 -ffast-math") -set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -march=native -O2 -ffast-math") +if (NOT "$ENV{CPU_OPTS}" STREQUAL "") + set(CPU_OPTS "$ENV{CPU_OPTS}" CACHE STRING "Set from ENV") +else() + set(CPU_OPTS "-march=native" CACHE STRING "Set by default") +endif() + +set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${CPU_OPTS} -O2 -ffast-math") +set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${CPU_OPTS} -O2 -ffast-math") set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wstrict-prototypes -Wmissing-prototypes -Wframe-larger-than=4095 -Wshadow -Wconversion -Wno-sign-conversion -Wall -Wextra -g3") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wframe-larger-than=4095 -Wshadow -Wconversion -Wno-sign-conversion -Wall -Wextra -g3") @@ -77,9 +83,14 @@ add_custom_target(install_python_requirements message("Configuring OpenMP...") find_package(OpenMP REQUIRED) -execute_process(COMMAND bash -c "cat /proc/cpuinfo | uniq | grep -m 2 \"siblings\" | cut -d \":\" -f 2 | sort --numeric --unique | xargs echo" +execute_process(COMMAND bash -c "cat /proc/cpuinfo | uniq | grep -m 2 \"siblings\" | cut -d \":\" -f 2 | sort --numeric --unique | head -n 1" OUTPUT_VARIABLE OMP_THREADS + RESULT_VARIABLE CPU_COUNT_RET ) +if ("${OMP_THREADS}" LESS_EQUAL 1 OR "${OMP_THREADS} " STREQUAL " ") + message("WARNING: Failed to automatically determine a reasonable number of default threads, setting default to 8") + set(OMP_THREADS 8) +endif() #if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") if (OpenMP_C_LIB_NAMES MATCHES ".*(gomp).*") @@ -99,7 +110,8 @@ set(ZSTD_BUILD_STATIC ON CACHE INTERNAL "") set(ZSTD_BUILD_SHARED OFF CACHE INTERNAL "") FetchContent_Declare(zstd GIT_REPOSITORY https://github.com/facebook/zstd.git - GIT_TAG v1.5.5 + GIT_TAG v1.5.7 + EXCLUDE_FROM_ALL ) FetchContent_MakeAvailable(zstd) # MakeAvailable doesn't allow for libzstd_static to be used as a target @@ -125,8 +137,9 @@ ExternalProject_Add(internal_zlib CONFIGURE_COMMAND ./configure --static --prefix=./ BUILD_COMMAND make test install -j8 BUILD_IN_SOURCE TRUE # Keep source beside install prefix - INSTALL_COMMAND "" + INSTALL_COMMAND cmake -E echo "Skipping zlib install step" UPDATE_COMMAND "" + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/internal_zlib-prefix/src/internal_zlib/libz.a ) ExternalProject_Get_Property(internal_zlib INSTALL_DIR) set(zlib_INSTALL_DIR ${INSTALL_DIR}) @@ -135,14 +148,14 @@ set_property(TARGET libz PROPERTY IMPORTED_LOCATION ${zlib_INSTALL_DIR}/src/inte add_dependencies(libz internal_zlib) message("") ExternalProject_Add(internal_hdf5 - GIT_REPOSITORY https://github.com/HDFGroup/hdf5 - GIT_TAG hdf5-1_12_2 - CONFIGURE_COMMAND ./configure --disable-dependency-tracking --enable-optimization=high --disable-shared --disable-cxx --disable-hl --disable-hltools --disable-tools --enable-threadsafe --disable-fortran --enable-build-mode=production --with-zlib=${zlib_INSTALL_DIR}/src/internal_zlib/ + URL https://github.com/HDFGroup/hdf5/archive/refs/tags/hdf5-1_12_3.zip + CONFIGURE_COMMAND ./configure --disable-dependency-tracking --enable-optimization=high --disable-shared --disable-cxx --disable-hl --disable-tools --enable-threadsafe --disable-fortran --enable-build-mode=production --with-zlib=${zlib_INSTALL_DIR}/src/internal_zlib/ BUILD_COMMAND make all -j8 BUILD_IN_SOURCE TRUE # Header gets moved to incorrect folder otherwise - INSTALL_COMMAND "" + INSTALL_COMMAND cmake -E echo "Skipping hdf5 install step" UPDATE_COMMAND "" DEPENDS internal_zlib + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/internal_hdf5-prefix/src/internal_hdf5/src/.libs/libhdf5.a ) ExternalProject_Get_Property(internal_hdf5 INSTALL_DIR) set(hdf5_INSTALL_DIR ${INSTALL_DIR}) @@ -158,9 +171,10 @@ ExternalProject_Add(internal_bitshuffle CONFIGURE_COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/src/misc/bitshuffleMakefile Makefile BUILD_COMMAND CMAKE_BASE_DIR=${CMAKE_CURRENT_BINARY_DIR} make link -j4 BUILD_IN_SOURCE TRUE - INSTALL_COMMAND "" + INSTALL_COMMAND cmake -E echo "Skipping bitshuffle install step" UPDATE_COMMAND "" DEPENDS internal_hdf5 zstd internal_zlib + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/internal_bitshuffle-prefix/src/internal_bitshuffle/libh5bshuf.a ) ExternalProject_Get_Property(internal_bitshuffle INSTALL_DIR) set(bitshuffle_INSTALL_DIR ${INSTALL_DIR}) @@ -171,21 +185,13 @@ message("") message ("Configuring PSRDADA") -# Find the PSRDADA libraries on the system (or set NODADA if not found) -ExternalProject_Add(internal_PSRDADA - GIT_REPOSITORY git://git.code.sf.net/p/psrdada/code - GIT_TAG ba2b88 - BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND ./bootstrap && ./configure --with-cuda-dir=no && cd 3rdparty && make libtimers.la # libtimers fails to compile during normal step with older Make versions - BUILD_COMMAND cd src/ && make libpsrdada.la dada_db dada_dbmeminfo dada_dbmetric dada_dbmonitor -j4 - INSTALL_COMMAND "" - UPDATE_COMMAND "" +# Disable CUDA for the PSRDADA compile +FetchContent_Declare(psrdada_src + GIT_REPOSITORY https://github.com/mirror-psrdada/mirror-psrdada-code.git + GIT_TAG 1e48b65e90b279683134f91395668b38e58f7646 + EXCLUDE_FROM_ALL ) -ExternalProject_Get_Property(internal_PSRDADA INSTALL_DIR) -set(psrdada_INSTALL_DIR ${INSTALL_DIR}) -add_library(libpsrdada STATIC IMPORTED) -set_property(TARGET libpsrdada PROPERTY IMPORTED_LOCATION ${psrdada_INSTALL_DIR}/src/internal_PSRDADA/src/.libs/libpsrdada.a) -add_dependencies(libpsrdada internal_PSRDADA) +FetchContent_MakeAvailable(psrdada_src) message("") @@ -199,8 +205,9 @@ ExternalProject_ADD(internal_FFTW3F CONFIGURE_COMMAND ./configure --disable-dependency-tracking --enable-float --enable-openmp --enable-static --enable-sse2 --enable-avx --enable-avx2 --enable-avx512 --enable-avx-128-fma BUILD_COMMAND make -j4 BUILD_IN_SOURCE TRUE - INSTALL_COMMAND "" + INSTALL_COMMAND cmake -E echo "Skipping FFTW3 install step" UPDATE_COMMAND "" + BUILD_BYPRODUCTS ${CMAKE_CURRENT_BINARY_DIR}/internal_FFTW3F-prefix/src/internal_FFTW3F/threads/.libs/libfftw3f_omp.a ${CMAKE_CURRENT_BINARY_DIR}/internal_FFTW3F-prefix/src/internal_FFTW3F/.libs/libfftw3f.a ) ExternalProject_Get_Property(internal_FFTW3F install_dir) set(fftw3f_INSTALL_DIR ${install_dir}) @@ -241,35 +248,35 @@ add_library(lofudpman STATIC src/lib/lofar_udp_io.c src/lib/lofar_udp_reader.c src/lib/lofar_udp_structs.c - src/CLI/lofar_cli_meta.c src/lib/lofar_udp_metadata.c - src/lib/lofar_udp_time.c) + src/lib/lofar_udp_time.c + src/CLI/lofar_cli_meta.c) -add_dependencies(lofudpman libzstd_static libpsrdada libhdf5 libz libh5bshuf install_python_requirements) # libfftw3fomp) ##yaml) #CSpice::cspice) + +add_dependencies(lofudpman libzstd_static psrdada libhdf5 libz libh5bshuf install_python_requirements dada_db dada_dbmeminfo dada_dbmetric dada_dbmonitor) # libfftw3fomp) ##yaml) #CSpice::cspice) # Include all of our library headers target_include_directories(lofudpman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src/lib ${CMAKE_CURRENT_BINARY_DIR}/src/lib # Configured headers - ${CMAKE_CURRENT_SOURCE_DIR}/src/CLI +# ${CMAKE_CURRENT_SOURCE_DIR}/src/CLI ${CMAKE_CURRENT_BINARY_DIR}/src/CLI # Configured headers ${CMAKE_CURRENT_SOURCE_DIR}/src/metadata - ) # Include all of our dependency headers target_include_directories(lofudpman PUBLIC ${fftw3f_INSTALL_DIR}/src/internal_FFTW3F/api) target_include_directories(lofudpman PUBLIC ${hdf5_INSTALL_DIR}/src/internal_hdf5/src/ ${zlib_INSTALL_DIR}/src/internal_zlib/ ${bitshuffle_INSTALL_DIR}/src/internal_bitshuffle/src/) -target_include_directories(lofudpman PUBLIC ${psrdada_INSTALL_DIR}/src/internal_PSRDADA/src/) +target_include_directories(lofudpman PUBLIC ${psrdada_src_SOURCE_DIR}/src/) target_include_directories(lofudpman PUBLIC ${zstd_SOURCE_DIR}/lib/) #target_include_directories(lofudpman PUBLIC ${yaml_SOURCE_DIR}/include/) -file(GLOB_RECURSE UDP_INCLUDE_FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/*/*.h" "${CMAKE_CURRENT_SOURCE_DIR}/src/*/*.hpp" "${CMAKE_CURRENT_BINARY_DIR}/src/*/*.h" "${CMAKE_CURRENT_BINARY_DIR}/src/*/*.hpp") +file(GLOB_RECURSE UDP_INCLUDE_FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/lib/*.h" "${CMAKE_CURRENT_SOURCE_DIR}/src/lib/*.hpp" "${CMAKE_CURRENT_BINARY_DIR}/src/lib/*.h" "${CMAKE_CURRENT_BINARY_DIR}/src/lib/*.hpp" "${CMAKE_CURRENT_BINARY_DIR}/src/CLI/*h") # FFTW requires a libm (math) link find_library(LIB_M m REQUIRED) target_link_libraries(lofudpman PUBLIC libfftw3f_omp libfftw3f ${LIB_M}) target_link_libraries(lofudpman PUBLIC libhdf5 libz libh5bshuf ${CMAKE_DL_LIBS}) # Some cases need libdl for HDF5 -target_link_libraries(lofudpman PUBLIC libpsrdada) +target_link_libraries(lofudpman PUBLIC psrdada) target_link_libraries(lofudpman PUBLIC libzstd_static) find_library(LIB_RT rt REQUIRED) # Required for shmem functions @@ -293,8 +300,8 @@ target_link_libraries(lofudpman PUBLIC OpenMP::OpenMP_CXX OpenMP::OpenMP_C) # Setup the CLIs add_executable(lofar_udp_extractor ${CMAKE_CURRENT_SOURCE_DIR}/src/CLI/lofar_cli_extractor.c) add_executable(lofar_stokes_extractor ${CMAKE_CURRENT_SOURCE_DIR}/src/CLI/lofar_cli_stokes.c) -target_link_libraries(lofar_udp_extractor PUBLIC lofudpman) -target_link_libraries(lofar_stokes_extractor PUBLIC lofudpman) +target_link_libraries(lofar_udp_extractor PUBLIC lofudpman lofudpman) +target_link_libraries(lofar_stokes_extractor PUBLIC lofudpman lofudpman) include(CMakePackageConfigHelpers) @@ -312,8 +319,14 @@ install(TARGETS lofudpman lofar_udp_extractor lofar_stokes_extractor ) install(FILES ${UDP_INCLUDE_FILES} DESTINATION include) install(PROGRAMS ${CMAKE_CURRENT_SOURCE_DIR}/src/misc/dreamBeamJonesGenerator.py DESTINATION bin) +install(PROGRAMS + ${psrdada_src_BINARY_DIR}/apps/dada_db + ${psrdada_src_BINARY_DIR}/apps/dada_dbmeminfo + ${psrdada_src_BINARY_DIR}/apps/dada_dbmetric + ${psrdada_src_BINARY_DIR}/apps/dada_dbmonitor + DESTINATION bin) # Add the tests directory enable_testing() set(TEST_ENV_PATH ${CMAKE_CURRENT_SOURCE_DIR}/tests/lib_tests/) -add_subdirectory(tests) \ No newline at end of file +add_subdirectory(tests) diff --git a/build.sh b/build.sh index 377209ef..85392d36 100755 --- a/build.sh +++ b/build.sh @@ -1,13 +1,6 @@ -rm -rf ./build -mkdir build -cd build +#!/usr/bin/env bash +set -e -echo "Preparing build with CC=${CC} CXX=${CXX} LD=${LD}" -cmake -DCMAKE_BUILD_TYPE=Release .. -cmake --build . --target all --config Release - -cmake --install . - -if [[ $1 -eq 1 ]]; then - ctest -V . -fi +bash ./build_prep.sh "${1}" +bash ./build_compile.sh "${1}" +bash ./build_install.sh "${1}" diff --git a/build_compile.sh b/build_compile.sh new file mode 100755 index 00000000..d4ab2090 --- /dev/null +++ b/build_compile.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -e +cd build + +if [[ $1 -eq 2 ]]; then + export CPU_OPTS="-march=x86-64" +fi + +cmake --build . --target all --config Release \ No newline at end of file diff --git a/build_docker.sh b/build_docker.sh old mode 100644 new mode 100755 index 1da34556..d5eeb02e --- a/build_docker.sh +++ b/build_docker.sh @@ -14,7 +14,7 @@ echo "Building Docker image with tag lofar-upm:latest from $LOCAL_DIR" docker build -f src/docker/Dockerfile -t lofar-upm:latest . if [ "$#" -eq 2 ]; then - echo "Building singulairty image, outputting in directory $1" + echo "Building singularity image, outputting in directory $1" touch "$1"/test if [ "$?" -ne 0 ]; then diff --git a/build_install.sh b/build_install.sh new file mode 100644 index 00000000..f82cdd32 --- /dev/null +++ b/build_install.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +cd build +cmake --install . +exitCode="$?" + +if [[ $1 -gt 0 ]]; then + ctest -V . + exitTest="$?" + if [[ $exitCode -eq 0 ]]; then + exitCode="${exitTest}" + fi +fi + +exit ${exitCode} \ No newline at end of file diff --git a/build_prep.sh b/build_prep.sh new file mode 100755 index 00000000..099c0a0a --- /dev/null +++ b/build_prep.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +rm -rf ./build + +set -e + +mkdir build +cd build + +if [[ $1 -eq 2 ]]; then + export CPU_OPTS="-march=x86-64" +fi + +echo "Preparing build with CC=${CC} CXX=${CXX}" +cmake -DCMAKE_BUILD_TYPE=Release .. \ No newline at end of file diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md new file mode 100644 index 00000000..ec63f1a6 --- /dev/null +++ b/docs/ARCHITECTURE.md @@ -0,0 +1,53 @@ +Architecture +============ + +This document is a high-level overview of the udpPacketManager library's internal workings, call tree and some of the unexpected design choices in parts of the library. + +The repo consists of four main components: +- [The library itself](#the-main-library) +- A mini [library of metadata writers](#metadata-writers) +- An [external Python executable](#python---dreambeam-wrapper) for interfacing with [dreamBeam](https://github.com/2baOrNot2ba/dreamBeam) to generate Jones matrices for calibration +- [A set of reference CLIs](#the-clis), which can also be used to process data + +# The Main Library +Interfacing with the library from a user perspective should only require three function calls, to setup a library for processing, to processed the input and then finally to cleanup any memory allocated for processing. + +These are the functions +- lofar_udp_reader_setup() +- lofar_udp_reader_step() or lofar_udp_reader_step_timed() +- lofar_udp_reader_cleanup() + +All of these functions can be found in the [lofar_udp_reader.c](../src/lib/lofar_udp_reader.c) file. + + + +To save on compute time, while the library does have a verbose output, it is only enabled in debug builds. It can be forcibly enabled in all builds by appending `-DVERBOSE` to your `CFLAGS` enviroment variable. + +### lofar_udp_reader_setup() +The *lofar_udp_reader_setup* function takes in a configuration struct defined in [lofar_udp_structs.c](../src/lib/lofar_udp_structs.h), opens the input datastreams, reads in data and populates as much metadata as it can from the input packet stream. It returns the pointer to a *lofar_udp_reader* struct which contains all of the informaiton and alocated buffers required to read in new data and re-order it into a set output. + +Calling this function will read in a standard gulp of data from the set number of ports, find the lowest common packet after a given starting point that is present on all ports of data and align them. This function does not re-order the data into an output, this will only occur after the first *lofar_udp_reader_step*-like function is called. + + +### lofar_udp_reader_step() +This function performs a combined call to the read function in [`lofar_udp_io.c`](../src/lib/lofar_udp_io.c) to get new input data and then passes that input data to the C++ function in [lofar_udp_backends.cpp](../src/lib/lofar_udp_backends.cpp), which handles re-ordering the data to the desired output. + +In the case that there is large packet loss, a significant number of out-of-order packets or the input returns an end of stream marker, this function will return a non-zero value. + +### lofar_udp_reader_cleanup() +This function calls `free` on all dynamically allocated memory, closes input data streams and generals cleans up after the library. + + +### Implementation details +The input read buffers are padded in two cases: +- They are always extended by 2 packets to hold both a reference packet from a previous iteration, and a blank packet for the case that packets are not replayed when packets are dropped +- If the input is from a zstd compressed file, the decompression system expects a given output size, so we need to extend the input buffer size to align with the expected size (only needed in cases where the packets per iteration isn't a multiple of 2) + +# Metadata Writers + + + +# Python - dreamBeam Wrapper + +# The CLIs +The CLIs act as reference on how to use the library, as well as an interface to process an observation from raw CEP packets to (un)calibrated voltages or Stokes outputs, with optional metadata attached. \ No newline at end of file diff --git a/docs/README_CLI.md b/docs/README_CLI.md index 0bb061bf..72f7bb06 100644 --- a/docs/README_CLI.md +++ b/docs/README_CLI.md @@ -334,3 +334,22 @@ has an example implementation of extended downsampling for modes (10, 20). - Take the input data, apply (10, 20 or 30) and (\*\*0) to form a Stokes \* sample, and sum it with the next sample - N input files -> 1 output file (16x less output samples) + + +### Split Polarisation Power Modes + +An output of 2 files, containing the power for the X and Y polarisations has been added to test beam models. + +#### \*70: "Split Polarisation Powers" + +- Take the input data, apply (10, 20 or 30), and then combine the polarisation real/imaging values to form 2 output +32-bit floating point (power of X, Y) filterbanks for each frequency sample +- N input files -> 2 output files + +#### \*7\*: "Split Polarisation Powers with Nx downsampling" + +- Follows the same rules as the Stokes downsampling described above, using 1, 2, 3, 4 as the final value of the +processing mode to give 2^n downsampling. +- Take the input data, apply (10, 20 or 30), and then combine the polarisation real/imaging values, sum 2^n times to +generate 2 downsampled filterbanks. +- N input files -> 2 output files \ No newline at end of file diff --git a/docs/README_RAW_DATA_HANDLING.md b/docs/README_RAW_DATA_HANDLING.md new file mode 100644 index 00000000..aa42bdb8 --- /dev/null +++ b/docs/README_RAW_DATA_HANDLING.md @@ -0,0 +1,16 @@ +Using the CLI and Plotting the Outputs +====================================== + +This will be a summary of how to process the outputs of this program in one of two ways in Python: raw data handling with numpy (applies to all processing modes), or via [sigpyproc](https://github.com/pravirkr/sigpyproc). The numpy approach works best for modes below 100 (non-stokes outputs), though these modes can also be handled through sigpyproc if you are familiar with the indiexing you are dealing with. + +While all input, intermediate and output arrays are internally handed as chars (8-bit words), you will need to be aware of your data type when processing the outputs of the CLI. For modes below 100, the the output word size is the same as the input word size. However, for modes above 100 the output will be a float that is 4 times wider than the input, and the type is changed to a float. So an 8-bit input will generate a 32-bit floating point value (float) and 16-bit will generate a 64-bit floating point value (double). + +Using the CLI +============= + +The Numpy Approach +================== + + +The SigPyProc Approach +====================== diff --git a/paper/paper.bib b/paper/paper.bib index 5c00c724..d3396b7f 100644 --- a/paper/paper.bib +++ b/paper/paper.bib @@ -2,8 +2,7 @@ @misc{andersonLuMP title = {{{LOFAR}} Und {{MPIfR Pulsare}}}, shorttitle = {{{LuMP}}}, author = {Anderson, J. M.}, - year = {2013}, - url = {https://deki.mpifr-bonn.mpg.de/Cooperations/LOFAR/Software/LuMP/LuMP_version_2.0}, + year = {2013} } @article{artemis, @@ -31,26 +30,25 @@ @misc{cookbook author = {Virtanen, I. I.}, year = {2018}, urldate = {2021-02-26}, - url = {https://lofar.ie/wp-content/uploads/2018/03/station_data_cookbook_v1.2.pdf}, + howpublished = {https://lofar.ie/wp-content/uploads/2018/03/station\_data\_cookbook\_v1.2.pdf}, file = {/home/suddenly/Zotero/storage/XU4QZPQ4/station_data_cookbook_v1.2.pdf} } @misc{dreamBeam, title = {{{dreamBeam}}}, author = {Carozzi, T. D.}, - year = {2016-}, + year = {2023}, month = apr, urldate = {2023-04-27}, abstract = {Radio telescope beam modeling framework}, copyright = {ISC}, - keywords = {beam-models}, - url = {https://github.com/2baOrNot2ba/dreamBeam}, + keywords = {beam-models} } @article{dspsr, title = {{{DSPSR}}: {{Digital Signal Processing Software}} for {{Pulsar Astronomy}}}, shorttitle = {{{DSPSR}}}, - author = {{van Straten}, Willem and Bailes, M.}, + author = {{van Straten}, W. and Bailes, M.}, year = {2011}, month = jan, journal = {Publications of the Astronomical Society of Australia}, @@ -100,8 +98,7 @@ @article{icd003 author = {Alexov, A.` and Anderson, K. and B{\textasciidieresis}ahren, L. and Grie{\ss}meier, J-M and Hessels, J.W.T and Masters, J.S and Stappers, B.W.}, year = {2012}, langid = {english}, - file = {/home/suddenly/Zotero/storage/XTPDWHXT/Date - A. Alexov, K. Anderson, L. B¨ahren, J.-M. Grießmei.pdf}, - url = {https://www.astron.nl/lofarwiki/lib/exe/fetch.php?media=public:documents:lofar-usg-icd-003.pdf} + file = {/home/suddenly/Zotero/storage/XTPDWHXT/Date - A. Alexov, K. Anderson, L. B¨ahren, J.-M. Grießmei.pdf} } @misc{iltdada, @@ -146,8 +143,7 @@ @misc{mckb357 author = {{McKay-Bukowski}, D.}, year = {2013}, urldate = {2023-04-27}, - url = {https://www.lofar-uk.org/wiki/uploads/Main/kaira_mode357_obs210413.pdf}, - howpublished = {https://www.lofar-uk.org/wiki/uploads/Main/kaira_mode357_obs210413.pdf}, + howpublished = {https://www.lofar-uk.org/wiki/uploads/Main/kaira\_mode357\_obs210413.pdf}, file = {/home/suddenly/Zotero/storage/EHGQJYTJ/kaira_mode357_obs210413.pdf} } @@ -165,7 +161,7 @@ @article{mckennaRRAT @misc{pelican, title = {Pelican/Pelican-Lofar}, - author = {Mort, B. and Dulwich, F. and Williams, C. and Salvini, S. and Chennamangalam, J. and Karastergiou, A. and Magro, A. and Baehren, L.}, + author = {Developers, Pelican}, year = {2021}, month = sep, urldate = {2023-04-27}, @@ -175,7 +171,7 @@ @misc{pelican @article{psrdada, title = {{{PSRDADA}}: {{Distributed Acquisition}} and {{Data Analysis}} for {{Radio Astronomy}}}, shorttitle = {{{PSRDADA}}}, - author = {{van Straten}, Willem and Jameson, Andrew and Osłowski, Stefan}, + author = {{van Straten}, Willem and Jameson, Andrew and Os{\AA}‚owski, Stefan}, year = {2021}, month = oct, journal = {Astrophysics Source Code Library}, diff --git a/paper/paper.pdf b/paper/paper.pdf index 4bf06532..5c612fa0 100644 Binary files a/paper/paper.pdf and b/paper/paper.pdf differ diff --git a/src/CLI/lofar_cli_extractor.c b/src/CLI/lofar_cli_extractor.c index 02f1b547..998d67f9 100644 --- a/src/CLI/lofar_cli_extractor.c +++ b/src/CLI/lofar_cli_extractor.c @@ -13,6 +13,7 @@ void helpMessages() { //printf(); printf("-p: Processing mode, options listed below (default: 0)\n"); + printf("-m: Number of packets to process in each read request (default: 65536)\n"); processingModes(); @@ -60,25 +61,21 @@ int main(int argc, char *argv[]) { int8_t flagged = 0; // Standard ugly input flags parser - while ((inputOpt = getopt(argc, argv, "hzrqfvVi:o:m:M:I:u:t:s:S:e:p:a:n:b:ck:T:")) != -1) { + while ((inputOpt = getopt(argc, argv, "hrqfvVi:o:m:M:I:u:t:s:S:e:p:a:n:b:ck:T:")) != -1) { input = 1; switch (inputOpt) { case 'i': - strncpy(inputFormat, optarg, DEF_STR_LEN - 1); - inputProvided = 1; + parseInput(inputFormat, optarg, &inputProvided); break; case 'o': - if (lofar_udp_io_write_parse_optarg(outConfig, optarg) < 0) { + if (parseOutput(outConfig, config, optarg, &outputProvided) < 0) { helpMessages(); CLICleanup(config, outConfig, headerBuffer); return 1; } - // If the metadata is not yet set, see if we can parse a requested type from the output filename - if (config->metadata_config.metadataType == NO_META) config->metadata_config.metadataType = lofar_udp_metadata_parse_type_output(optarg); - outputProvided = 1; break; case 'm': @@ -134,8 +131,9 @@ int main(int argc, char *argv[]) { config->calibrateData = APPLY_CALIBRATION; break; - case 'z': - clock200MHz = 0; + case 'C': + config->calibrationDuration = strtof(optarg, &endPtr); + if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } break; case 'q': @@ -238,7 +236,8 @@ int main(int argc, char *argv[]) { return 1; } - + const float sampleTime = 1.0 / (clock200MHz ? CLOCK200MHZ : CLOCK160MHZ); + const float timePerGulp = (float) (config->packetsPerIteration * UDPNTIMESLICE) * sampleTime; if (silent == 0) { printf("LOFAR UDP Data extractor (v%s, lib v%s)\n\n", UPM_CLI_VERSION, UPM_VERSION); printf("=========== Given configuration ===========\n"); @@ -248,6 +247,7 @@ int main(int argc, char *argv[]) { printf("Output File: %s\n\n", outConfig->outputFormat); printf("Packets/Gulp:\t%ld\t\t\tPorts:\t%d\n\n", config->packetsPerIteration, config->numPorts); + printf("Time (s) /Gulp:\t%f\t\tTime (s)/Iteration:\t%f\n\n", timePerGulp, config->numPorts * timePerGulp); VERBOSE(printf("Verbose:\t%d\n", config->verbose);); printf("Proc Mode:\t%03d\t\t\tReader:\t%d\n\n", config->processingMode, config->readerType); printf("Beamlet limits:\t%d, %d\n\n", config->beamletLimits[0], config->beamletLimits[1]); @@ -312,7 +312,6 @@ int main(int argc, char *argv[]) { } - if (silent == 0) { lofar_udp_time_get_current_isot(reader, stringBuff, sizeof(stringBuff) / sizeof(stringBuff[0])); printf("\n\n=========== Reader Information ===========\n"); @@ -420,6 +419,7 @@ int main(int argc, char *argv[]) { if (silent == 0) { printf("Metadata processing for operation %ld after %f seconds.\n", loops, timing[2]); printf("Disk writes completed for operation %ld after %f seconds.\n", loops, timing[3]); + printf("Overall real-time factor of %.3fx\n", timePerGulp / (timing[0] + timing[1] + timing[2] + timing[3])); for (int idx = 0; idx < TIMEARRLEN; idx++) { timing[idx] = 0.; @@ -470,12 +470,14 @@ int main(int argc, char *argv[]) { for (int8_t port = 0; port < reader->meta->numPorts; port++) droppedPackets += reader->meta->portTotalDroppedPackets[port]; + const float processedTime = (packetsProcessed * UDPNTIMESLICE) * sampleTime; printf("Reader loop exited (%ld); overall process took %f seconds.\n", returnVal, TICKTOCK(tick, tock)); printf("We processed %ld packets, representing %.03lf seconds of data", packetsProcessed, - (float) (reader->meta->numPorts * packetsProcessed * UDPNTIMESLICE) * 5.12e-6f); + (float) (reader->meta->numPorts * packetsProcessed * UDPNTIMESLICE) * sampleTime); if (reader->meta->numPorts > 1) { - printf(" (%.03lf per port)\n", (float) (packetsProcessed * UDPNTIMESLICE) * 5.12e-6f); + printf(" (%.03lf per port)\n", processedTime); } else { printf(".\n"); } + printf("The data was processed with a real-time factor of %.2f\n.", processedTime / TICKTOCK(tick, tock)); printf("Total Read Time:\t%3.02lf s\t\t\tTotal CPU Ops Time:\t%3.02lf s\nTotal Write Time:\t%3.02lf s\t\t\tTotal MetaD Time:\t%3.02lf s\n", totalReadTime, totalOpsTime, totalWriteTime, totalMetadataTime); printf("Total Data Read:\t%3.03lf GB\t\tTotal Data Written:\t%3.03lf GB\n", diff --git a/src/CLI/lofar_cli_meta.c b/src/CLI/lofar_cli_meta.c index 88e01bda..1c0c9811 100644 --- a/src/CLI/lofar_cli_meta.c +++ b/src/CLI/lofar_cli_meta.c @@ -14,21 +14,21 @@ void sharedFlags(void) { printf("-i: Input file name format\n"); printf("-o: Output file name format\n"); - printf("-f: Append output files if they already exist (default: Exit if exists)\n"); + printf("-f: Append output files if they already exist (default: Exit if exists)\n"); printf("-I: Input metadata file (default: '')\n"); - printf("-c: Calibrate the data using the provided metadata.\n"); - printf("-M: Override output metadata format\n"); - printf("-m: Number of packets to process in each read request (default: 65536)\n"); + printf("-c: Calibrate the data from input metadata (default: false)\n"); + printf("-C: Duration (in seconds) to precalculate Jones matrices for (default: 3600.0 seconds).\n"); + printf("-M: Override output metadata format (default none, options SIGPROC, GUPPI, DADA, HDF5)\n"); printf("-u: Number of ports to combine (default: 4)\n"); printf("-n: Base value to iterate when choosing ports (default: 0)\n"); printf("-b: , Beamlets to extract from the input dataset. Lo is inclusive, hi is exclusive ( eg. 0,300 will return 300 beamlets, 0:299). (default: 0,0 === all)\n"); printf("-t: String of the time of the first requested packet, format YYYY-MM-DDTHH:mm:ss (default: '')\n"); printf("-s: Maximum number of seconds of raw data to extract/process (default: all)\n"); printf("-S: Break into a new file every N given iterations (default: infinite, never break)\n"); - printf("-r: Replay the previous packet when a dropped packet is detected (default: pad with 0 values)\n"); + printf("-r: Replay the previous packet when a dropped packet is detected (default: pad with 0 values)\n"); printf("-T: OpenMP Threads to use during processing (8+ highly recommended, default: %d)\n", OMP_THREADS); - printf("-q: Enable silent mode for the CLI, don't print any information outside of library error messages (default: False)\n"); + printf("-q: Enable silent mode for the CLI, don't print any information outside of library error messages (default: False)\n"); VERBOSE(printf("-v: Enable verbose output (default: False)\n"); printf("-V: Enable highly verbose output (default: False)\n")); } @@ -61,11 +61,11 @@ void processingModes(void) { printf("Stokes outputs can be decimated in orders of 2, up to 16x by adjusting the last digit of their processing mode.\n"); printf("This is handled in orders of two, so 101 will give a Stokes I with 2x decimation, 102, will give 4x, 103 will give 8x and 104 will give 16x.\n\n"); - printf("Similarly, by default the beamlet order is reversed (negative frequency offset). By adding 100 to each Stokes output, e.g. changing mode 104 to 204," - "the frequency order will be changed to be increasing (positive frequency offset, matching the telescope configuration).\n"); + printf("Similarly, by default the data order is beamlet-major, increasing frequency. By adding 100 to each Stokes output, e.g. changing mode 104 to 204," + "the frequency order will be changed to be decreasing (negative frequency offset). By adding 200, e.g. 121 to 321, the outputwill be changed to time-major, increasing frequency.\n"); } -int checkOpt(int opt, char* inp, char* endPtr) { +int8_t checkOpt(int opt, const char *inp, const char *endPtr) { if (inp != endPtr && *(endPtr) == '\0') { return 0; } @@ -74,6 +74,23 @@ int checkOpt(int opt, char* inp, char* endPtr) { return 1; } +void parseInput(char *inputFormat, const char *optargvar, int8_t *inputProvided) { + strncpy(inputFormat, optargvar, DEF_STR_LEN - 1); + *inputProvided = 1; +} + +int8_t parseOutput(lofar_udp_io_write_config *outConfig, lofar_udp_config *config, const char *optargvar, int8_t *outputProvided) { + if (lofar_udp_io_write_parse_optarg(outConfig, optargvar) < 0) { + fprintf(stderr, "ERROR: Failed to parse output file name %s, exiting.\n", optargvar); + return -1; + } + // If the metadata is not yet set, see if we can parse a requested type from the output filename + if (config->metadata_config.metadataType == NO_META) config->metadata_config.metadataType = lofar_udp_metadata_parse_type_output(optarg); + *outputProvided = 1; + + return 0; +} + /** * Copyright (C) 2023 David McKenna * This file is part of udpPacketManager . diff --git a/src/CLI/lofar_cli_meta.h.in b/src/CLI/lofar_cli_meta.h.in index e0c800eb..d44ab111 100644 --- a/src/CLI/lofar_cli_meta.h.in +++ b/src/CLI/lofar_cli_meta.h.in @@ -17,10 +17,14 @@ extern "C" { #endif // Define prototypes -void helpMessages(void); void sharedFlags(void); void processingModes(void); -int checkOpt(int opt, char *inp, char *endPtr); + +// CLI flag parsers +int8_t checkOpt(int opt, const char *inp, const char *endPtr); +void parseInput(char *inputFormat, const char *optargvar, int8_t *inputProvided); +int8_t parseOutput(lofar_udp_io_write_config *outConfig, lofar_udp_config *config, const char *optargvar, int8_t *outputProvided); + // Exit reasons, 0, 1 aren't handled, only defined up to 3 extern const char exitReasons[8][DEF_STR_LEN]; diff --git a/src/CLI/lofar_cli_stokes.c b/src/CLI/lofar_cli_stokes.c index 0a948e00..aeeb69ca 100644 --- a/src/CLI/lofar_cli_stokes.c +++ b/src/CLI/lofar_cli_stokes.c @@ -1,6 +1,7 @@ #include "lofar_cli_meta.h" // Import FFTW3 for channelisation +#include #include "fftw3.h" // Import omp for parallelisaing the channelisation/downsamling operations @@ -9,58 +10,295 @@ // Constant to define the length of the timing variable array #define TIMEARRLEN 7 +#define dmPhaseConst 2.41e-10 // Hidden somewhere in Hankins et al, can't find a copy + // Output Stokes Modes typedef enum stokes_t { + NOSTOKES= 0, STOKESI = 1, STOKESQ = 2, STOKESU = 4, - STOKESV = 8 + STOKESV = 8, + CORRLTE = 16, } stokes_t; -void helpMessages() { +typedef enum window_t { + // Eq. 1 https://articles.adsabs.harvard.edu/pdf/2006ChJAS...6b..53L + NO_WINDOW = 0, + COHERENT_DEDISP = 1, + PSR_STANDARD = 2, + BOXCAR = 4, + +} window_t; + +struct fftwVars { + fftwf_complex *intermediateX; + fftwf_complex *intermediateY; + fftwf_complex *in1; + fftwf_complex *in2; + fftwf_complex *chirpData; + fftwf_plan fftForwardX; + fftwf_plan fftForwardY; + fftwf_plan fftBackwardX; + fftwf_plan fftBackwardY; +}; + +void helpMessages(void) { printf("LOFAR Stokes extractor (CLI v%s, lib v%s)\n\n", UPM_CLI_VERSION, UPM_VERSION); - printf("Usage: lofar_cli_stokes "); + printf("Usage: lofar_cli_stokes \n\n"); - printf("\n\n"); + sharedFlags(); + printf("\n"); - printf("-C: Channelisation factor to apply when processing data (default: disabled == 1)\n"); + printf("-P Desired Stokes/Voltage outputs, include one or more of 'IQUVA', where A returns XXYY (default: I)\n"); + printf("-F Channelisation factor to apply when processing data (default: disabled == 1)\n"); printf("-d Temporal downsampling to apply when processing data (default: disabled == 1)\n"); - printf("-D Apply temporal downsampling to spectral data (slower, but higher quality (default: disabled)\n"); + printf("-B The count of FFT bins as a factor of the channelisation factor (default: 8, minimum: 3)\n"); + printf("-N The number of FFTs per channel to perform per iteration (default: 512)\n"); + printf("-Z Apply temporal downsampling via spectral data (fft -> sum channels, slower, but theoretically higher quality (default: disabled)\n"); + printf("-D Apply coherent dedispersion to the data (default: false; 0 pc/cc)\n"); + printf("-W Window function to apply to the data (defulat: PSR_STANDARD)\n"); + } -static void CLICleanup(lofar_udp_config *config, lofar_udp_io_write_config *outConfig, int8_t *header, fftwf_complex *X, fftwf_complex *Y) { +static void +CLICleanup(lofar_udp_config *config, lofar_udp_io_write_config *outConfig, struct fftwVars *fftw, float **outputStokes) { FREE_NOT_NULL(outConfig); FREE_NOT_NULL(config); - FREE_NOT_NULL(header); - if (X != NULL) { - fftwf_free(X); - X = NULL; + if (fftw != NULL) { + ARB_FREE_NOT_NULL(fftw->intermediateX, fftwf_free); + ARB_FREE_NOT_NULL(fftw->intermediateY, fftwf_free); + ARB_FREE_NOT_NULL(fftw->in1, fftwf_free); + ARB_FREE_NOT_NULL(fftw->in2, fftwf_free); + ARB_FREE_NOT_NULL(fftw->chirpData, fftwf_free); + + ARB_FREE_NOT_NULL(fftw->fftForwardX, fftwf_destroy_plan); + ARB_FREE_NOT_NULL(fftw->fftForwardY, fftwf_destroy_plan); + ARB_FREE_NOT_NULL(fftw->fftBackwardX, fftwf_destroy_plan); + ARB_FREE_NOT_NULL(fftw->fftBackwardY, fftwf_destroy_plan); + + fftwf_cleanup_threads(); + fftwf_cleanup(); + + free(fftw); + } + + if (outputStokes != NULL) { + for (int32_t i = 0; (i < MAX_OUTPUT_DIMS && outputStokes[i] != NULL); i++) { + FREE_NOT_NULL(outputStokes[i]); + } + } +} + +#pragma omp declare simd +static inline void generateVoltageCorrelations(fftwf_complex X, fftwf_complex Y, float *accumulations) { + accumulations[0] += (X[0] * X[0]) + (X[1] * X[1]); + accumulations[1] += (Y[0] * Y[0]) + (Y[1] * Y[1]); + accumulations[2] += ((X[0] * Y[0]) + (X[1] * Y[1])); + accumulations[3] += ((X[0] * Y[1]) - (X[1] * Y[0])); +} + +#pragma omp declare simd +static inline void calibrateDataFunc(float *X, float *Y, const float *beamletJones, const int8_t **inputs, + const int64_t baseOffset) { + *(X) = calibrateSample(beamletJones[0], inputs[0][baseOffset], \ + -1.0f * beamletJones[1], inputs[0][baseOffset + 1], \ + beamletJones[2], inputs[1][baseOffset], \ + -1.0f * beamletJones[3], inputs[1][baseOffset + 1]); + + *(&(X[1])) = calibrateSample(beamletJones[0], inputs[0][baseOffset + 1], \ + beamletJones[1], inputs[0][baseOffset], \ + beamletJones[2], inputs[1][baseOffset + 1], \ + beamletJones[3], inputs[1][baseOffset]); + + *(Y) = calibrateSample(beamletJones[4], inputs[0][baseOffset], \ + -1.0f * beamletJones[5], inputs[0][baseOffset + 1], \ + beamletJones[6], inputs[1][baseOffset], \ + -1.0f * beamletJones[7], inputs[1][baseOffset + 1]); + + *(&(X[1])) = calibrateSample(beamletJones[4], inputs[0][baseOffset + 1], \ + beamletJones[5], inputs[0][baseOffset], \ + beamletJones[6], inputs[1][baseOffset + 1], \ + beamletJones[7], inputs[1][baseOffset]); +} + +static int8_t windowGenerator(fftwf_complex *const chirpFunc, const double coherentDM, const double fbottom, const double subbandbw, const int32_t mbin, const int32_t nsub, const int32_t chanFac, window_t window) { + double dmFactConst; + int8_t coherentDedisp; + if (window & COHERENT_DEDISP) { + dmFactConst = 2.0 * M_PI * coherentDM / dmPhaseConst; + window ^= COHERENT_DEDISP; // Remove contribution + coherentDedisp = 1; // Set flag (float comparisons are unstable) + } else { + dmFactConst = 0; + coherentDedisp = 0; + } + const double chanbw = subbandbw / (double) chanFac; + + for (int32_t subband = 0; subband < nsub; subband++) { + const double subbandFreq = fbottom + (double) subband * subbandbw + 0.5 * subbandbw; + + for (int32_t chan = 0; chan < chanFac; chan++) { + const double channelFreq = subbandFreq - 0.5 * subbandbw + subbandbw * ((double) chan / (double) chanFac) + 0.5 * chanbw; + + for (int32_t bin = 0; bin < mbin; bin++) { + const int32_t fftSpaceIdx = subband * mbin * chanFac + chan * mbin + bin; + + // Frequency in FFT space + const double binFreq = chanbw * ((double) bin / (double) mbin) + 0.5 * (chanbw / (double) mbin - chanbw); + double taperScale; + + + switch (window) { + case PSR_STANDARD: + taperScale = 1.0 / sqrt(1.0 + pow(binFreq / (0.47 * chanbw), 80.0)); + break; + + case BOXCAR: + taperScale = 1; + break; + + default: + fprintf(stderr, "ERROR: No window selected for %s (window %d), exiting.\n", __func__, window); + return 1; + break; + } + + taperScale /= (double) (mbin * chanFac); + + + if (coherentDedisp) { + const double phase = -1.0f * binFreq * binFreq * dmFactConst / ((channelFreq + binFreq) + channelFreq * channelFreq); + chirpFunc[fftSpaceIdx][0] = (float) (cos(phase) * taperScale); + chirpFunc[fftSpaceIdx][1] = (float) (sin(phase) * taperScale); + } else { + chirpFunc[fftSpaceIdx][0] = (float) taperScale; + chirpFunc[fftSpaceIdx][1] = (float) taperScale; + } + } + } + } + + return 0; +} + +#define npol 2 +static void windowData(fftwf_complex *const x, fftwf_complex *const y, const fftwf_complex *chirpData, const int32_t mbin, const int32_t nsub, const int32_t channelisation, const int32_t nfft) { + fftwf_complex *data[npol] = { x, y }; + + for (int i = 0; i < npol; i++) { + fftwf_complex *const workingPtr = data[i]; + + #pragma omp parallel for default(shared) + for (int32_t subband = 0; subband < nsub; subband++) { + for (int32_t fft = 0; fft < nfft; fft++) { + for (int32_t bin = 0; bin < mbin; bin++) { + for (int32_t sample = 0; sample < channelisation; sample++) { + const size_t inputIdx = subband * nfft * mbin * channelisation + fft * mbin * channelisation + bin * channelisation + sample; + const size_t freqIdx = subband * mbin * channelisation + sample * mbin + bin; + + workingPtr[inputIdx][0] *= chirpData[freqIdx][0]; + workingPtr[inputIdx][1] *= chirpData[freqIdx][1]; + + } + } + } + } } - if (Y != NULL) { - fftwf_free(Y); - Y = NULL; +} + +static void overlapAndPad(fftwf_complex *const outputs[2], const int8_t **inputs, const float *beamletJones, const int64_t nbin, const int32_t noverlap, const int32_t nsub, const int32_t nfft) { + #pragma omp parallel for default(shared) + for (int32_t sub = 0; sub < nsub; sub++) { + const size_t baseIndexInput = sub * (nbin - 2 * noverlap) * nfft; + const size_t baseIndexOutput = sub * nbin * nfft; + for (int32_t fft = 0; fft < nfft; fft++) { + // On the first FFT: don't overwrite previous iteration's padding + const int32_t binStart = fft ? 0 : noverlap * 2; + const int32_t binOffset = noverlap * 2; + + const size_t fftIndexInput = baseIndexInput + fft * (nbin - 2 * noverlap); + const size_t fftIndexOutput = baseIndexOutput + fft * nbin; + + for (int64_t bin = binStart; bin < nbin; bin++) { + const size_t outputIndex = fftIndexOutput + bin; + const size_t inputIndex = 2 * (fftIndexInput + bin - binOffset); + if (beamletJones != NULL) { + calibrateDataFunc(outputs[0][outputIndex], outputs[1][outputIndex], beamletJones, inputs, inputIndex); + } else { + outputs[0][outputIndex][0] = (float) inputs[0][inputIndex]; + outputs[0][outputIndex][1] = (float) inputs[0][inputIndex + 1]; + outputs[1][outputIndex][0] = (float) inputs[1][inputIndex]; + outputs[1][outputIndex][1] = (float) inputs[1][inputIndex + 1]; + } + } + } } } -static void reorderData(fftwf_complex *x, fftwf_complex *y, size_t bins, size_t channels) { - fftwf_complex *data[] = { x, y }; - fftwf_complex tmp; +static void padNextIteration(fftwf_complex *outputs[2], const int8_t **inputs, const float *beamletJones, const int32_t nbin, const int32_t noverlap, const int32_t nsub, const int32_t nfft) { + // First iteration: reflection padding + if (nfft < 0) { + //nfft *= -1; + #pragma omp parallel for default(shared) + for (int32_t sub = 0; sub < nsub; sub++) { + const size_t baseIndexInput = sub * (nbin - 2 * noverlap) * -1 * nfft + (2 * noverlap) - 1; + const size_t baseIndexOutput = -1 * sub * nbin * nfft; + + for (int32_t bin = 0; bin < 2 * noverlap; bin++) { + const size_t inputIndex = 2 * (baseIndexInput - bin); + const size_t outputIndex = baseIndexOutput + bin; + if (beamletJones != NULL) { + calibrateDataFunc(outputs[0][outputIndex], outputs[1][outputIndex], beamletJones, inputs, inputIndex); + } else { + outputs[0][outputIndex][0] = (float) inputs[0][inputIndex]; + outputs[0][outputIndex][1] = (float) inputs[0][inputIndex + 1]; + outputs[1][outputIndex][0] = (float) inputs[1][inputIndex]; + outputs[1][outputIndex][1] = (float) inputs[1][inputIndex + 1]; + } + } + } + // Otherwise: normal padding + } else { + #pragma omp parallel for default(shared) + for (int32_t sub = 0; sub < nsub; sub++) { + const size_t baseIndexInput = (sub + 1) * (nbin - 2 * noverlap) * nfft - 2 * noverlap; + const size_t baseIndexOutput = sub * nbin * nfft; + + for (int32_t bin = 0; bin < 2 * noverlap; bin++) { + const size_t inputIndex = 2 * (baseIndexInput + bin); + const size_t outputIndex = baseIndexOutput + bin; + if (beamletJones != NULL) { + calibrateDataFunc(outputs[0][outputIndex], outputs[1][outputIndex], beamletJones, inputs, inputIndex); + } else { + outputs[0][outputIndex][0] = (float) inputs[0][inputIndex]; + outputs[0][outputIndex][1] = (float) inputs[0][inputIndex + 1]; + outputs[1][outputIndex][0] = (float) inputs[1][inputIndex]; + outputs[1][outputIndex][1] = (float) inputs[1][inputIndex + 1]; + } + } + } + } +} - size_t inputIdx, outputIdx; +static void reorderData(fftwf_complex *const x, fftwf_complex *const y, const int32_t bins, const int32_t channels) { + fftwf_complex *const data[npol] = { x, y }; + const int32_t spectrumOffset = (bins + bins % 2) / 2; + for (int32_t i = 0; i < npol; i++) { + fftwf_complex *const workingPtr = data[i]; - for (int i = 0; i < 2; i++) { - fftwf_complex *workingPtr = data[i]; + #pragma omp parallel for default(shared) + for (int32_t channel = 0; channel < channels; channel++) { + fftwf_complex tmp; -#pragma omp parallel for default(shared) shared(workingPtr) - for (size_t channel = 0; channel < channels; channel++) { - for (size_t sample = 0; sample < (bins / 2); sample++) { - inputIdx = sample + channel * bins; - outputIdx = inputIdx + (bins / 2); + for (int32_t sample = 0; sample < spectrumOffset; sample++) { + const size_t inputIdx = channel * bins + sample; + const size_t outputIdx = inputIdx + spectrumOffset; tmp[0] = workingPtr[inputIdx][0]; tmp[1] = workingPtr[inputIdx][1]; @@ -73,117 +311,142 @@ static void reorderData(fftwf_complex *x, fftwf_complex *y, size_t bins, size_t } } } +#undef npol -static void transposeDetect(fftwf_complex *X, fftwf_complex *Y, float **outputs, size_t mbin, size_t channelisation, size_t nsub, size_t channelDownsample, int stokesFlags) { - float accumulator[4] = { 0.0f, 0.0f, 0.0f, 0.0f }; +static void +transposeDetect(fftwf_complex (*X), fftwf_complex (*Y), float **outputs, const int64_t mbin, int32_t noverlap, const int32_t nfft, const int32_t channelisation, const int32_t nsub, + int32_t channelDownsample, const stokes_t stokesFlags) { if (channelDownsample < 1) { channelDownsample = 1; } - size_t inputIdx, outputIdx, outputArr; - size_t accumulations = 0; - size_t outputNchan = nsub * channelisation / channelDownsample; - - printf("Detecting for: %zu, %zu, %zu, %zu\n", mbin, channelisation, nsub, channelDownsample); - -#pragma omp parallel for default(shared) private(accumulator, accumulations, inputIdx, outputIdx, outputArr) - for (size_t sub = 0; sub < nsub; sub++) { - for (size_t sample = 0; sample < mbin; sample++) { - ARR_INIT(accumulator, 4, 0.0f); - - accumulations = 0; - - //#pragma omp simd //nontemporal(outputs) - for (size_t chan = 0; chan < channelisation; chan++) { - - // Input is time major - // curr total samples size per channel - inputIdx = sample + (chan * mbin) + (sub * mbin * channelisation); - - if (stokesFlags & STOKESI) { - accumulator[0] += stokesI(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); - } - if (stokesFlags & STOKESQ) { - accumulator[1] += stokesQ(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); - } - if (stokesFlags & STOKESU) { - accumulator[2] += stokesU(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); - } - if (stokesFlags & STOKESV) { - accumulator[3] += stokesV(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); - } - if (++accumulations == channelDownsample) { - // Output is channel major - // curr total channels size per time sample - outputIdx = (chan / channelDownsample) + (sub * channelisation / channelDownsample) + (outputNchan * sample); - - outputArr = 0; - if (stokesFlags & STOKESI) { - outputs[outputArr][outputIdx] = accumulator[0]; - accumulator[0] = 0.0f; - outputArr++; - } - if (stokesFlags & STOKESQ) { - outputs[outputArr][outputIdx] = accumulator[1]; - accumulator[1] = 0.0f; - outputArr++; - } - if (stokesFlags & STOKESU) { - outputs[outputArr][outputIdx] = accumulator[2]; - accumulator[2] = 0.0f; - outputArr++; + noverlap /= channelisation; + const size_t outputNchan = nsub * channelisation / channelDownsample; + const size_t outputMbin = mbin - 2 * noverlap; + const int64_t correlateScale = (stokesFlags & CORRLTE) ? UDPNPOL : 1; + + #pragma omp parallel for default(shared) + for (int32_t sub = 0; sub < nsub; sub++) { + + for (int32_t fft = 0; fft < nfft; fft++) { + float accumulator[4] = { 0.0f, 0.0f, 0.0f, 0.0f }; + int32_t accumulations = 0; + + for (int64_t sample = noverlap; sample < (mbin - noverlap); sample++) { + + for (int32_t chan = 0; chan < channelisation; chan++) { + + // Input is time major + // curr channels ffts subbands + const size_t inputIdx = sample + chan * mbin + fft * mbin * channelisation + sub * nfft * mbin * channelisation; + + if (stokesFlags & CORRLTE) { + generateVoltageCorrelations(X[inputIdx], Y[inputIdx], accumulator); + } else { + if (stokesFlags & STOKESI) { + accumulator[0] += stokesI(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); + } + if (stokesFlags & STOKESQ) { + accumulator[1] += stokesQ(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); + } + if (stokesFlags & STOKESU) { + accumulator[2] += stokesU(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); + } + if (stokesFlags & STOKESV) { + accumulator[3] += stokesV(X[inputIdx][0], X[inputIdx][1], Y[inputIdx][0], Y[inputIdx][1]); + } } - if (stokesFlags & STOKESV) { - outputs[outputArr][outputIdx] = accumulator[3]; - accumulator[3] = 0.0f; - outputArr++; + + if (++accumulations == channelDownsample) { + // Output is channel major + const int64_t effectiveSample = (sample - noverlap) + fft * outputMbin; + // curr total channels size per time sample + const size_t outputIdx = correlateScale * ((chan / channelDownsample) + (sub * channelisation / channelDownsample) + (outputNchan * effectiveSample)); + + size_t outputArr = 0; + if (stokesFlags & CORRLTE) { + outputs[outputArr][outputIdx] = accumulator[0]; + outputs[outputArr][outputIdx + 1] = accumulator[1]; + outputs[outputArr][outputIdx + 2] = accumulator[2]; + outputs[outputArr][outputIdx + 3] = accumulator[3]; + + accumulator[0] = 0.0f; + accumulator[1] = 0.0f; + accumulator[2] = 0.0f; + accumulator[3] = 0.0f; + } else { + if (stokesFlags & STOKESI) { + outputs[outputArr][outputIdx] = accumulator[0]; + accumulator[0] = 0.0f; + outputArr++; + } + if (stokesFlags & STOKESQ) { + outputs[outputArr][outputIdx] = accumulator[1]; + accumulator[1] = 0.0f; + outputArr++; + } + if (stokesFlags & STOKESU) { + outputs[outputArr][outputIdx] = accumulator[2]; + accumulator[2] = 0.0f; + outputArr++; + } + if (stokesFlags & STOKESV) { + outputs[outputArr][outputIdx] = accumulator[3]; + accumulator[3] = 0.0f; + outputArr++; + } + } + accumulations = 0; } - accumulations = 0; } } } } } -static void temporalDownsample(float **data, size_t numOutputs, size_t nbin, size_t nchans, size_t downsampleFactor) { +static void temporalDownsample(float ** const data, const size_t numOutputs, const int64_t nbin, const int64_t nchans, const int32_t downsampleFactor) { if (downsampleFactor < 2) { // Nothing to do return; } - size_t inputIdx, outputIdx; - size_t accumulations = 0; - float accumulator = 0.0f; + VERBOSE(printf("Downsampling for: %ld, %ld, %d\n", nchans, nbin, downsampleFactor)); - VERBOSE(printf("Downsampling for: %zu, %zu, %zu\n", nchans, nbin, downsampleFactor)); + float *accumulator = calloc(nchans, sizeof(float)); for (size_t output = 0; output < numOutputs; output++) { -#pragma omp parallel for default(shared) private(accumulator, accumulations, inputIdx, outputIdx) - for (size_t sub = 0; sub < nchans; sub++) { - accumulations = 0; - accumulator = 0.0f; - - for (size_t sample = 0; sample < nbin; sample++) { + int32_t accumulations = 0; + for (int64_t sample = 0; sample < nbin; sample++) { + const int64_t chanOffset = sample * nchans; + #pragma omp parallel for default(shared) + for (int64_t sub = 0; sub < nchans; sub++) { // Input is time major - // curr size per sample - inputIdx = sub + (sample * nchans); + // curr size per sample + const size_t inputIdx = sub + (chanOffset); - accumulator += data[output][inputIdx]; + accumulator[sub] += data[output][inputIdx]; + } - if (++accumulations == downsampleFactor) { + if (++accumulations == downsampleFactor) { + const int64_t dsChanOffset = sample / downsampleFactor; + #pragma omp parallel for default(shared) + for (int64_t sub = 0; sub < nchans; sub++) { // Output is channel major - // curr size per time sample - outputIdx = sub + (sample / downsampleFactor) * nchans; - data[output][outputIdx] = accumulator; - accumulations = 0; + // curr size per output time sample + const size_t outputIdx = sub + dsChanOffset; + data[output][outputIdx] = accumulator[sub]; + accumulator[sub] = 0.0f; } + accumulations = 0; } } } + + free(accumulator); } int main(int argc, char *argv[]) { @@ -191,16 +454,16 @@ int main(int argc, char *argv[]) { // Set up input local variables int32_t inputOpt, input = 0; float seconds = 0.0f; + double coherentDM = 0.0f; char inputTime[256] = "", stringBuff[128] = "", inputFormat[DEF_STR_LEN] = ""; int32_t silent = 0, inputProvided = 0, outputProvided = 0; int64_t maxPackets = LONG_MAX, startingPacket = -1, splitEvery = LONG_MAX; int8_t clock200MHz = 1; + window_t window = NO_WINDOW; lofar_udp_config *config = lofar_udp_config_alloc(); lofar_udp_io_write_config *outConfig = lofar_udp_io_write_alloc(); - int8_t *headerBuffer = NULL; - if (config == NULL || outConfig == NULL) { fprintf(stderr, "ERROR: Failed to allocate memory for configuration structs (something has gone very wrong...), exiting.\n"); FREE_NOT_NULL(config); FREE_NOT_NULL(outConfig); @@ -221,25 +484,22 @@ int main(int argc, char *argv[]) { char *endPtr; int8_t flagged = 0; + struct fftwVars *fftw = calloc(sizeof(struct fftwVars), 1); + // FFTW strategy - int32_t channelisation = 1, downsampling = 1, spectralDownsample = 0; - fftwf_complex *intermediateX = NULL; - fftwf_complex *intermediateY = NULL; - fftwf_plan fftForwardX; - fftwf_plan fftForwardY; - fftwf_plan fftBackwardX; - fftwf_plan fftBackwardY; - int8_t stokesParameters = 0, numStokes = 0; + int32_t channelisation = 1, downsampling = 1, spectralDownsample = 0, nfactor = 8, nforward = 512; + stokes_t stokesParameters = NOSTOKES; + int8_t numStokes = 0; // Standard ugly input flags parser - while ((inputOpt = getopt(argc, argv, "rzqfvVDhi:o:m:M:I:u:t:s:S:b:C:d:P:T:")) != -1) { + while ((inputOpt = getopt(argc, argv, "crzqfvVZhD:i:o:m:M:I:u:t:s:S:b:C:F:d:P:T:B:N:")) != -1) { input = 1; switch (inputOpt) { case 'i': if (strncpy(inputFormat, optarg, DEF_STR_LEN - 1) != inputFormat) { fprintf(stderr, "ERROR: Failed to store input data file format, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } inputProvided = 1; @@ -248,19 +508,15 @@ int main(int argc, char *argv[]) { case 'o': if (lofar_udp_io_write_parse_optarg(outConfig, optarg) < 0) { + fprintf(stderr, "ERROR: Failed to parse output file pattern, exiting.\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } if (config->metadata_config.metadataType == NO_META) config->metadata_config.metadataType = lofar_udp_metadata_parse_type_output(optarg); outputProvided = 1; break; - case 'm': - config->packetsPerIteration = strtol(optarg, &endPtr, 10); - if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } - break; - case 'M': config->metadata_config.metadataType = lofar_udp_metadata_string_to_meta(optarg); break; @@ -268,7 +524,7 @@ int main(int argc, char *argv[]) { case 'I': if (strncpy(config->metadata_config.metadataLocation, optarg, DEF_STR_LEN) != config->metadata_config.metadataLocation) { fprintf(stderr, "ERROR: Failed to copy metadata file location to config, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } break; @@ -281,7 +537,7 @@ int main(int argc, char *argv[]) { case 't': if (strncpy(inputTime, optarg, 255) != inputTime) { fprintf(stderr, "ERROR: Failed to copy start time from input, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } break; @@ -299,7 +555,7 @@ int main(int argc, char *argv[]) { case 'b': if (sscanf(optarg, "%hd,%hd", &(config->beamletLimits[0]), &(config->beamletLimits[1])) < 0) { fprintf(stderr, "ERROR: Failed to scan input beamlets, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } break; @@ -308,7 +564,16 @@ int main(int argc, char *argv[]) { config->replayDroppedPackets = 1; break; + case 'c': + config->calibrateData = GENERATE_JONES; + break; + case 'C': + config->calibrationDuration = strtof(optarg, &endPtr); + if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } + break; + + case 'F': channelisation = internal_strtoi(optarg, &endPtr); if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } break; @@ -318,10 +583,6 @@ int main(int argc, char *argv[]) { if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } break; - case 'z': - clock200MHz = 0; - break; - case 'q': silent = 1; break; @@ -343,31 +604,59 @@ int main(int argc, char *argv[]) { if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } break; - case 'D': + case 'B': + nfactor = internal_strtoi(optarg, &endPtr); + if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } + if (!flagged && nfactor < 6) { + fprintf(stderr, "ERROR: nfactor must be at least 6 due to FFT overlaps. Exiting.\n"); + CLICleanup(config, outConfig, fftw, NULL); + return 1; + } + break; + + case 'N': + nforward = internal_strtoi(optarg, &endPtr); + if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } + break; + + case 'Z': spectralDownsample = 1; break; + case 'D': + coherentDM = strtof(optarg, &endPtr); + window = COHERENT_DEDISP + PSR_STANDARD; + if (checkOpt(inputOpt, optarg, endPtr)) { flagged = 1; } + break; + case 'P': if (numStokes > 0) { fprintf(stderr, "ERROR: -P flag has been parsed more than once. Exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return -1; } - if (strchr(optarg, 'I') != NULL) { - numStokes += 1; - stokesParameters += STOKESI; - } - if (strchr(optarg, 'Q') != NULL) { - numStokes += 1; - stokesParameters += STOKESQ; - } - if (strchr(optarg, 'U') != NULL) { + stokesParameters = NOSTOKES; + numStokes = 0; + if (strchr(optarg, 'A') != NULL) { numStokes = 1; - stokesParameters += STOKESU; - } - if (strchr(optarg, 'V') != NULL) { - numStokes += 1; - stokesParameters += STOKESV; + stokesParameters = CORRLTE; + } else { + if (strchr(optarg, 'I') != NULL) { + numStokes += 1; + stokesParameters += STOKESI; + } + if (strchr(optarg, 'Q') != NULL) { + numStokes += 1; + stokesParameters += STOKESQ; + } + if (strchr(optarg, 'U') != NULL) { + numStokes = 1; + stokesParameters += STOKESU; + } + if (strchr(optarg, 'V') != NULL) { + numStokes += 1; + stokesParameters += STOKESV; + } } break; @@ -394,43 +683,61 @@ int main(int argc, char *argv[]) { #pragma GCC diagnostic pop helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } } + int64_t windowCheck = window & ~COHERENT_DEDISP; + if (windowCheck & (windowCheck -1)) { + fprintf(stderr, "ERROR: More than 1 window has been set. Attempting to patch back to "); + if (window & PSR_STANDARD) { + fprintf(stderr, "the pulsar window.\n"); + window &= (COHERENT_DEDISP & PSR_STANDARD); + } else if (window & BOXCAR) { + fprintf(stderr, "the boxcar window.\n"); + window &= (COHERENT_DEDISP & BOXCAR); + } + } else if (!windowCheck && (channelisation > 1 || (downsampling > 1 && spectralDownsample))) { + fprintf(stderr, "WARNING: No window was set, falling back to the pulsar window.\n"); + window |= PSR_STANDARD; + } + + int64_t correlateScale = (stokesParameters & CORRLTE) ? UDPNPOL : 1; + // Pre-set processing mode - // TODO: Floating processing mode if chanellisation is disabled? - config->processingMode = TIME_MAJOR_ANT_POL_FLOAT; + // TODO: Floating processing mode if channelisation is disabled? + if (channelisation > 1) { + config->processingMode = TIME_MAJOR_ANT_POL; + } else { + config->processingMode = TIME_MAJOR_ANT_POL_FLOAT; + } if (flagged) { - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } - if (!input) { + if (!input || !inputProvided) { fprintf(stderr, "ERROR: No inputs provided, exiting.\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } - if (!inputProvided) { - fprintf(stderr, "ERROR: An input was not provided, exiting.\n"); - helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); - return 1; + if (numStokes == NOSTOKES) { + printf("No Stokes configuration provided; defaulting to STOKESI\n"); + numStokes = 1; + stokesParameters = STOKESI; } // Pass forward output channelisation and downsampling factors config->metadata_config.externalChannelisation = channelisation; config->metadata_config.externalDownsampling = downsampling; - if (channelisation > 1) { - if (spectralDownsample) { - channelisation *= downsampling; - } + if (spectralDownsample && downsampling < 2) { + fprintf(stderr, "WARNING: Spectral downsampling is enabled, but the downsampling factor was not set.\n"); } if (spectralDownsample) { @@ -438,52 +745,67 @@ int main(int argc, char *argv[]) { channelisation *= downsampling; } - - if ((config->packetsPerIteration * UDPNTIMESLICE) % (512 > channelisation ? 512 : channelisation)) { - fprintf(stderr, "ERROR: Number of samples needed per iterations for channelisation factor %d and downsampling factor %d (%d) is not a multiple of number of the set number of timesamples/packets per iteration (%ld), exiting.\n", channelisation, downsampling, (512 > channelisation ? 512 : channelisation), UDPNTIMESLICE * config->packetsPerIteration); + if (channelisation < 1) { + fprintf(stderr, "WARNING: Channelisation was set to %d, returning to 1.\n", channelisation); + channelisation = 1; + } else if ((channelisation > 1 && channelisation % 2) != 0) { + fprintf(stderr, "ERROR: Invalid channelisation factor (less than 1, non-factor of 2)\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } - /* - if (((long long) (channelisation * downsampling)) % config->packetsPerIteration != 0) { - fprintf(stderr, "ERROR: Number of packets per iteration is not evenly divisible by the number of samples needed to process at the given channelisation factor %ld and downsampling factor %ld (%ld, %ld remainder), exiting.\n", channelisation, downsampling, channelisation * downsampling, (channelisation * downsampling) % config->packetsPerIteration); - helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); - return 1; - } - */ + const int32_t noverlap = 2 * (channelisation ?: 0); + const int32_t nbin = nfactor * channelisation; + const int32_t nbin_valid = nbin - 2 * noverlap; - if (channelisation < 1 || ( channelisation > 1 && channelisation % 2) != 0) { - fprintf(stderr, "ERROR: Invalid channelisation factor (less than 1, non-factor of 2)\n"); - helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); - return 1; + if ((nbin_valid * nforward) % UDPNTIMESLICE) { + fprintf(stderr, "WARNING: Increasing nforward from %d to ", nforward); + nforward += (UDPNTIMESLICE - (nforward % UDPNTIMESLICE)); + fprintf(stderr, "%d to ensure data can be loaded correctly (validate as 0: %d).\n", nforward, (nbin_valid * nforward) % UDPNTIMESLICE); + } + config->packetsPerIteration = (nbin_valid * nforward) / UDPNTIMESLICE; + + if (channelisation > 1 || window & COHERENT_DEDISP) { + // Should no longer be needed; keeping for debug/validation purposes; remove before release + int32_t invalidation = (config->packetsPerIteration * UDPNTIMESLICE) % nbin_valid; + if (invalidation) { + fprintf(stderr, "WARNING: Reducing packets per iteration by %d to attempt to align with FFT boundaries.\n", invalidation / UDPNTIMESLICE); + config->packetsPerIteration -= invalidation / UDPNTIMESLICE; + } + if ((config->packetsPerIteration * UDPNTIMESLICE) % nbin_valid) { + fprintf(stderr, + "ERROR: Number of samples needed per iterations for channelisation factor %d and downsampling factor %d (%d) is not a multiple of number of the set number of timesamples/packets per iteration (%ld), exiting.\n", + channelisation, downsampling, nbin_valid, UDPNTIMESLICE * config->packetsPerIteration); + helpMessages(); + CLICleanup(config, outConfig, fftw, NULL); + return 1; + } } if (downsampling < 1) { fprintf(stderr, "ERROR: Invalid downsampling factor (less than 1)\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } if (lofar_udp_io_read_parse_optarg(config, inputFormat) < 0) { + fprintf(stderr, "ERROR: Failed to parse input file format, exiting.\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } if (!outputProvided) { fprintf(stderr, "ERROR: An output was not provided, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } if (config->calibrateData != NO_CALIBRATION && strcmp(config->metadata_config.metadataLocation, "") == 0) { fprintf(stderr, "ERROR: Data calibration was enabled, but metadata was not provided. Exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } @@ -499,11 +821,12 @@ int main(int argc, char *argv[]) { fprintf(stderr, "One or more inputs invalid or not fully initialised, exiting.\n"); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } - + const float sampleTime = 1.0 / (clock200MHz ? CLOCK200MHZ : CLOCK160MHZ); + const float timePerGulp = (float) (config->packetsPerIteration * UDPNTIMESLICE) * sampleTime; if (silent == 0) { printf("LOFAR Stokes Data extractor (v%s, lib v%s)\n\n", UPM_CLI_VERSION, UPM_VERSION); printf("=========== Given configuration ===========\n"); @@ -513,6 +836,7 @@ int main(int argc, char *argv[]) { printf("Output File: %s\n\n", outConfig->outputFormat); printf("Packets/Gulp:\t%ld\t\t\tPorts:\t%d\n\n", config->packetsPerIteration, config->numPorts); + printf("Time (s) /Gulp:\t%f\t\tTime (s)/Iteration:\t%f\n\n", timePerGulp, config->numPorts * timePerGulp); VERBOSE(printf("Verbose:\t%d\n", config->verbose);); printf("Proc Mode:\t%03d\t\t\tReader:\t%d\n\n", config->processingMode, config->readerType); printf("Beamlet limits:\t%d, %d\n\n", config->beamletLimits[0], config->beamletLimits[1]); @@ -520,9 +844,10 @@ int main(int argc, char *argv[]) { if (strcmp(inputTime, "") != 0) { startingPacket = lofar_udp_time_get_packet_from_isot(inputTime, clock200MHz); - if (startingPacket == 1) { + if (startingPacket == -1) { + fprintf(stderr, "ERROR: Failed to parse input time %s, exiting.\n", inputTime); helpMessages(); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } } @@ -552,7 +877,8 @@ int main(int argc, char *argv[]) { fprintf(stderr, "ERROR: Failed to initialise multi-threaded FFTWF.\n"); } omp_set_num_threads(config->ompThreads); - fftwf_plan_with_nthreads(omp_get_num_threads()); + fftwf_plan_with_nthreads(omp_get_max_threads()); + printf("Using %d (%d) threads.\n", config->ompThreads, omp_get_max_threads()); @@ -570,7 +896,7 @@ int main(int argc, char *argv[]) { // Returns null on error, check if (reader == NULL) { fprintf(stderr, "Failed to generate reader. Exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } @@ -578,52 +904,99 @@ int main(int argc, char *argv[]) { if (((lofar_source_bytes *) &(reader->meta->inputData[0][1]))->clockBit != (unsigned int) clock200MHz) { fprintf(stderr, "ERROR: The clock bit of the first packet does not match the clock state given when starting the CLI. Add or remove -c from your command. Exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); return 1; } if (reader->packetsPerIteration * UDPNTIMESLICE > INT32_MAX) { fprintf(stderr, "ERROR: Input FFT bins are too long (%ld vs %d), exiting.\n", reader->packetsPerIteration * UDPNTIMESLICE, INT32_MAX); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, NULL); + return 1; + } + + // Override nbit to 32 + reader->metadata->nbit = -32; + + // Override nifs for correlations output + if (correlateScale == UDPNPOL) { + reader->metadata->npol = 2; + reader->metadata->ndim = 2; + } else { + reader->metadata->npol = 1; + reader->metadata->ndim = 1; + } + + if (config->calibrateData == GENERATE_JONES) { + reader->metadata->upm_calibrated = APPLY_CALIBRATION; + } + + if (_lofar_udp_metadata_setup_types(reader->metadata)) { + fprintf(stderr, "ERROR: Failed to update metadata struct, exiting.\n"); + CLICleanup(config, outConfig, fftw, NULL); return 1; } // Channelisation setup - int32_t nbin = (int32_t) 512 > channelisation ? 512 : channelisation; - int32_t mbin = nbin / channelisation; - int32_t nsub = reader->meta->totalProcBeamlets; - int32_t nchan = reader->meta->totalProcBeamlets * channelisation; + const int32_t mbin = nbin / channelisation; + const int32_t nsub = reader->meta->totalProcBeamlets; + const int32_t nchan = nsub * channelisation; - int32_t nfft = (int32_t) ((reader->packetsPerIteration * UDPNTIMESLICE) / nbin); + const int32_t nfft = (int32_t) ((reader->packetsPerIteration * UDPNTIMESLICE) / nbin_valid); - fftwf_complex * in1 = NULL; - fftwf_complex * in2 = NULL; + int64_t floatWriteOffset = (2 * noverlap * nchan) / downsampling; // This will cause issues for an odd-factored downsample which isn't a multiple of the channelsiation factor + //int32_t nfft = 1; //printf("%d, %d, %d, %d\n", nbin, mbin, nsub, nchan); - float *outputStokes[MAX_OUTPUT_DIMS]; + fftw->in1 = fftwf_alloc_complex(nbin * nsub * nfft); + fftw->in2 = fftwf_alloc_complex(nbin * nsub * nfft); + + fftwf_complex *inFFTArrs[2] = { fftw->in1, fftw->in2 }; + + if (fftw->in1 == NULL || fftw->in2 == NULL) { + fprintf(stderr, "ERROR: Failed to allocate input FFTW buffers, exiting.\n"); + CLICleanup(config, outConfig, fftw, NULL); + return -1; + } - if (channelisation > 0) { - in1 = fftwf_alloc_complex(nbin * nsub * nfft); - in2 = fftwf_alloc_complex(nbin * nsub * nfft); - intermediateX = fftwf_alloc_complex(nbin * nsub * nfft); - intermediateY = fftwf_alloc_complex(nbin * nsub * nfft); - if (intermediateX == NULL || intermediateY == NULL) { + if (channelisation > 1 || window & COHERENT_DEDISP) { + fftw->intermediateX = fftwf_alloc_complex(nbin * nsub * nfft); + fftw->intermediateY = fftwf_alloc_complex(nbin * nsub * nfft); + fftw->chirpData = fftwf_alloc_complex(nbin * nsub * nfft); + + if (fftw->intermediateX == NULL || fftw->intermediateY == NULL || fftw->chirpData == NULL) { fprintf(stderr, "ERROR: Failed to allocate output FFTW buffers, exiting.\n"); - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); - return -1; + CLICleanup(config, outConfig, fftw, NULL); + return 1; } - fftForwardX = fftwf_plan_many_dft(1, &nbin, nsub * nfft, in1, &nbin, 1, nbin, intermediateX, &nbin, 1, nbin, FFTW_FORWARD, FFTW_ESTIMATE_PATIENT); - fftForwardY = fftwf_plan_many_dft(1, &nbin, nsub * nfft, in2, &nbin, 1, nbin, intermediateY, &nbin, 1, nbin, FFTW_FORWARD, FFTW_ESTIMATE_PATIENT); + fftw->fftForwardX = fftwf_plan_many_dft(1, &nbin, nsub * nfft, fftw->in1, NULL, 1, nbin, fftw->intermediateX, NULL, 1, nbin, FFTW_FORWARD, FFTW_ESTIMATE_PATIENT | FFTW_ALLOW_LARGE_GENERIC | FFTW_DESTROY_INPUT); + fftw->fftForwardY = fftwf_plan_many_dft(1, &nbin, nsub * nfft, fftw->in2, NULL, 1, nbin, fftw->intermediateY, NULL, 1, nbin, FFTW_FORWARD, FFTW_ESTIMATE_PATIENT | FFTW_ALLOW_LARGE_GENERIC | FFTW_DESTROY_INPUT); + fftw->fftBackwardX = fftwf_plan_many_dft(1, &mbin, nchan * nfft, fftw->intermediateX, NULL, 1, mbin, fftw->in1, NULL, 1, mbin, FFTW_BACKWARD, FFTW_ESTIMATE_PATIENT | FFTW_ALLOW_LARGE_GENERIC | FFTW_DESTROY_INPUT); + fftw->fftBackwardY = fftwf_plan_many_dft(1, &mbin, nchan * nfft, fftw->intermediateY, NULL, 1, mbin, fftw->in2, NULL, 1, mbin, FFTW_BACKWARD, FFTW_ESTIMATE_PATIENT | FFTW_ALLOW_LARGE_GENERIC | FFTW_DESTROY_INPUT); + + if (fftw->fftForwardX == NULL || fftw->fftForwardY == NULL || fftw->fftBackwardX == NULL || fftw->fftBackwardY == NULL) { + fprintf(stderr, "ERROR: Failed to generate FFTW plans, exiting.\n"); + CLICleanup(config, outConfig, fftw, NULL); + return 1; + } - fftBackwardX = fftwf_plan_many_dft(1, &mbin, nchan * nfft, intermediateX, &mbin, 1, mbin, in1, &mbin, 1, mbin, FFTW_BACKWARD, FFTW_ESTIMATE_PATIENT); - fftBackwardY = fftwf_plan_many_dft(1, &mbin, nchan * nfft, intermediateY, &mbin, 1, mbin, in2, &mbin, 1, mbin, FFTW_BACKWARD, FFTW_ESTIMATE_PATIENT); + if (windowGenerator(fftw->chirpData, coherentDM, reader->metadata->ftop_raw, reader->metadata->subband_bw, mbin, nsub, channelisation, window)) { + fprintf(stderr, "ERROR: Failed to generate window, exiting.\n"); + CLICleanup(config, outConfig, fftw, NULL); + return 1; + } } - size_t outputFloats = reader->packetsPerIteration * UDPNTIMESLICE * reader->meta->totalProcBeamlets / (spectralDownsample ?: 1); + float *outputStokes[MAX_OUTPUT_DIMS]; + ARR_INIT(outputStokes, MAX_OUTPUT_DIMS, NULL); + + size_t outputFloats = reader->packetsPerIteration * correlateScale * UDPNTIMESLICE * reader->meta->totalProcBeamlets / (spectralDownsample ?: 1); for (int8_t i = 0; i < numStokes; i++) { outputStokes[i] = calloc(outputFloats, sizeof(float)); + CHECK_ALLOC(outputStokes[i], 1, + fprintf(stderr, "ERROR: Failed to allocate input FFTW buffers, exiting.\n"); + CLICleanup(config, outConfig, fftw, outputStokes);); } if (silent == 0) { @@ -654,13 +1027,13 @@ int main(int argc, char *argv[]) { int64_t expectedWriteSize[MAX_OUTPUT_DIMS]; // lofar_udp_io_write_setup_helper(lofar_udp_io_write_config *config, int64_t outputLength[], int8_t numOutputs, int32_t iter, int64_t firstPacket) for (int8_t i = 0; i < numStokes; i++) { - expectedWriteSize[i] = reader->packetsPerIteration * UDPNTIMESLICE * reader->meta->totalProcBeamlets / downsampling * sizeof(float); + expectedWriteSize[i] = reader->packetsPerIteration * correlateScale * UDPNTIMESLICE * reader->meta->totalProcBeamlets / downsampling * sizeof(float); } if ((returnVal = lofar_udp_io_write_setup_helper(outConfig, expectedWriteSize, numStokes, 0, startingPacket)) < 0) { fprintf(stderr, "ERROR: Failed to open an output file (%ld, errno %d: %s), breaking.\n", returnVal, errno, strerror(errno)); returnValMeta = (returnValMeta < 0 && returnValMeta > -7) ? returnValMeta : -7; - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, outputStokes); return 1; } @@ -688,64 +1061,62 @@ int main(int argc, char *argv[]) { // Write out the desired amount of packets; cap if needed. packetsToWrite = reader->meta->packetsPerIteration; + if (packetsToWrite != config->packetsPerIteration) { + fprintf(stderr, "ERROR: Failed to get requested number of packets (requested %ld, got %ld, exiting.\n", config->packetsPerIteration, packetsToWrite); + break; + } if (splitEvery == LONG_MAX && maxPackets < packetsToWrite) { packetsToWrite = maxPackets; } - printf("Begin channelisation %d %d %d %d (%d)\n", channelisation, spectralDownsample, downsampling, nfft, nfft * nbin); + VERBOSE(printf("Begin channelisation %d %d %d %d %d %d %d (%d)\n", channelisation, spectralDownsample, downsampling, nfft, nbin, noverlap, mbin, nfft * nbin)); // Perform channelisation, temporal downsampling as needed - if (channelisation > 1) { + float *beamletJones = (reader->meta->calibrateData == GENERATE_JONES) ? reader->meta->jonesMatrices[reader->meta->calibrationStep] : NULL; + if (channelisation > 1 || window & COHERENT_DEDISP) { CLICK(tickChan); - printf("Copy %ld\n", nbin * nsub * nfft * 2 * sizeof(float)); - memcpy(in1, reader->meta->outputData[0], nbin * nsub * nfft * 2 * sizeof(float)); - memcpy(in2, reader->meta->outputData[1], nbin * nsub * nfft * 2 * sizeof(float)); - printf("Execute\n"); - fftwf_execute(fftForwardX); - fftwf_execute(fftForwardY); - printf("Reorder %ld\n", (int64_t) nbin * nfft * nsub); - reorderData(intermediateX, intermediateY, nbin * nfft, nsub); - //windowData(); - printf("Reorder %ld\n", (int64_t) mbin * nfft * nsub * channelisation); - reorderData(intermediateX, intermediateY, mbin * nfft, nsub * channelisation); - printf("Cleanup %ld\n", (int64_t) nbin * nsub * nfft * 2); - ARR_INIT(((float *) in1),nbin * nsub * nfft * 2,0.0f); - ARR_INIT(((float *) in2),nbin * nsub * nfft * 2,0.0f); - printf("Execute\n"); - fftwf_execute(fftBackwardX); - fftwf_execute(fftBackwardY); + if (localLoops == 0) { + padNextIteration(inFFTArrs, (const int8_t **) reader->meta->outputData, beamletJones, nbin, noverlap, nsub, -1 * nfft); + } + overlapAndPad(inFFTArrs, (const int8_t **) reader->meta->outputData, beamletJones, nbin, noverlap, nsub, nfft); + fftwf_execute(fftw->fftForwardX); + fftwf_execute(fftw->fftForwardY); + reorderData(fftw->intermediateX, fftw->intermediateY, nbin, nsub * nfft); + windowData(fftw->intermediateX, fftw->intermediateY, fftw->chirpData, mbin, nsub, channelisation, nfft); + reorderData(fftw->intermediateX, fftw->intermediateY, mbin, nchan * nfft); + fftwf_execute(fftw->fftBackwardX); + fftwf_execute(fftw->fftBackwardY); CLICK(tockChan); timing[4] = TICKTOCK(tickChan, tockChan); totalChanTime += timing[4]; CLICK(tickDetect); - printf("Detect\n"); - transposeDetect(in1, in2, outputStokes, mbin * nfft, channelisation, nsub, spectralDownsample, stokesParameters); + transposeDetect(fftw->in1, fftw->in2, outputStokes, mbin, noverlap, nfft, channelisation, nsub, spectralDownsample, stokesParameters); + padNextIteration(inFFTArrs, (const int8_t **) reader->meta->outputData, beamletJones, nbin, noverlap, nsub, nfft); } else { CLICK(tickDetect); - printf("Detectsolo\n"); - transposeDetect(in1, in2, outputStokes, mbin * nfft, channelisation, nsub, spectralDownsample, stokesParameters); + transposeDetect((fftwf_complex *) reader->meta->outputData[0], (fftwf_complex *) reader->meta->outputData[1], outputStokes, reader->meta->packetsPerIteration * UDPNTIMESLICE, 0, 1, 1, nsub, 1, stokesParameters); } - ARR_INIT(((float *) intermediateX), (int64_t) nbin * nsub * nfft * 2,0.0f); - ARR_INIT(((float *) intermediateY), (int64_t) nbin * nsub * nfft * 2,0.0f); + CLICK(tockDetect); timing[5] = TICKTOCK(tickDetect, tockDetect); totalDetectTime += timing[5]; - printf("Begin downsampling\n"); + VERBOSE(printf("Begin downsampling\n")); if (downsampling > 1 && !spectralDownsample) { CLICK(tickDown); - temporalDownsample(outputStokes, numStokes, mbin * nfft, nchan, downsampling); + const int64_t samples = (mbin - (2 * noverlap) / channelisation) * nfft; + temporalDownsample(outputStokes, numStokes, samples, nchan * correlateScale, downsampling); CLICK(tockDown); timing[6] = TICKTOCK(tickDown, tockDown); totalDownsampleTime += timing[6]; } - printf("Finish downsampling %d\n", numStokes); + VERBOSE(printf("Finish downsampling %d\n", numStokes)); for (int8_t out = 0; out < numStokes; out++) { - printf("Enter loop\n"); + VERBOSE(printf("Enter loop\n")); if (reader->metadata != NULL) { if (reader->metadata->type != NO_META) { CLICK(tick1); - if ((returnVal = lofar_udp_metadata_write_file(reader, outConfig, out, reader->metadata, headerBuffer, 4096 * 8, localLoops == 0)) < + if ((returnVal = lofar_udp_metadata_write_file(reader, outConfig, out, reader->metadata, reader->metadata->headerBuffer, DEF_HDR_LEN, localLoops == 0)) < 0) { fprintf(stderr, "ERROR: Failed to write header to output (%ld, errno %d: %s), breaking.\n", returnVal, errno, strerror(errno)); returnValMeta = (returnValMeta < 0 && returnValMeta > -4) ? returnValMeta : -4; @@ -755,15 +1126,13 @@ int main(int argc, char *argv[]) { timing[2] += TICKTOCK(tick1, tock1); } } - printf("Sizing\n"); CLICK(tick0); - size_t outputLength = packetsToWrite * UDPNTIMESLICE * reader->meta->totalProcBeamlets / downsampling * sizeof(float); - VERBOSE(printf("Writing %ld bytes (%ld packets) to disk for output %d...\n", - outputLength, packetsToWrite, out)); + size_t outputLength = ((nbin_valid * nfft * correlateScale * nsub / downsampling) - floatWriteOffset) * sizeof(float); + VERBOSE(printf("Writing %ld bytes (%ld packets, offset %ld) to disk for output %d...\n", + outputLength, packetsToWrite, sizeof(float) * floatWriteOffset, out)); size_t outputWritten; - printf("Writing...\n"); - if ((outputWritten = lofar_udp_io_write(outConfig, out, (int8_t *) outputStokes[out], + if ((outputWritten = lofar_udp_io_write(outConfig, out, (int8_t *) &(outputStokes[out][floatWriteOffset]), outputLength)) != outputLength) { fprintf(stderr, "ERROR: Failed to write data to output (%ld bytes/%ld bytes writen, errno %d: %s)), breaking.\n", outputWritten, outputLength, errno, strerror(errno)); returnValMeta = (returnValMeta < 0 && returnValMeta > -5) ? returnValMeta : -5; @@ -774,6 +1143,8 @@ int main(int argc, char *argv[]) { } + floatWriteOffset = 0; + if (splitEvery != LONG_MAX && returnValMeta > -2) { if (!((localLoops + 1) % splitEvery)) { if (!silent) printf("Hit splitting condition; closing writers and re-opening for iteration %ld.\n", localLoops / splitEvery); @@ -802,6 +1173,8 @@ int main(int argc, char *argv[]) { printf("Detection completed for operation %d after %f seconds.\n", loops, timing[5]); if (channelisation) printf("Channelisation completed for operation %d after %f seconds.\n", loops, timing[4]); if (downsampling) printf("Temporal downsampling completed for operation %d after %f seconds.\n", loops, timing[6]); + printf("Overall real-time factor of %.3fx\n", timePerGulp / (timing[0] + timing[1] + timing[2] + timing[3] + timing[4] + timing[5] + timing[6])); + ARR_INIT(timing, TIMEARRLEN, 0.0); @@ -837,13 +1210,6 @@ int main(int argc, char *argv[]) { } - fftwf_destroy_plan(fftForwardX); - fftwf_destroy_plan(fftForwardY); - fftwf_destroy_plan(fftBackwardX); - fftwf_destroy_plan(fftBackwardY); - fftwf_cleanup_threads(); - fftwf_cleanup(); - CLICK(tock); int64_t droppedPackets = 0, totalPacketLength = 0, totalOutLength = 0; @@ -857,12 +1223,14 @@ int main(int argc, char *argv[]) { for (int port = 0; port < reader->meta->numPorts; port++) droppedPackets += reader->meta->portTotalDroppedPackets[port]; + const float processedTime = (packetsProcessed * UDPNTIMESLICE) * sampleTime; printf("Reader loop exited (%ld); overall process took %f seconds.\n", returnVal, TICKTOCK(tick, tock)); printf("We processed %ld packets, representing %.03lf seconds of data", packetsProcessed, - (float) (reader->meta->numPorts * packetsProcessed * UDPNTIMESLICE) * 5.12e-6f); + (float) (reader->meta->numPorts * packetsProcessed * UDPNTIMESLICE) * sampleTime); if (reader->meta->numPorts > 1) { - printf(" (%.03lf per port)\n", (float) (packetsProcessed * UDPNTIMESLICE) * 5.12e-6f); + printf(" (%.03lf per port)\n", (float) (packetsProcessed * UDPNTIMESLICE) * sampleTime); } else { printf(".\n"); } + printf("The data was processed with a real-time factor of %.2f\n.", processedTime / TICKTOCK(tick, tock)); printf("Total Read Time:\t%3.02lf s\t\t\tTotal CPU Ops Time:\t%3.02lf s\nTotal Write Time:\t%3.02lf s\t\t\tTotal MetaD Time:\t%3.02lf s\n", totalReadTime, totalOpsTime, totalWriteTime, totalMetadataTime); printf("Total Channelisation Time:\t%3.02lf s\t\t\tTotal Detection Time:\t%3.02lf s\nTotal Downsampling Time:\t%3.02lf s\n", totalChanTime, totalDetectTime, totalDownsampleTime); @@ -879,15 +1247,14 @@ int main(int argc, char *argv[]) { // Clean-up the reader object, also closes the input files for us - lofar_udp_reader_cleanup(reader); + ARB_FREE_NOT_NULL(reader, lofar_udp_reader_cleanup); if (silent == 0) { printf("Reader cleanup performed successfully.\n"); } // Cleanup the writer object, close any outputs - lofar_udp_io_write_cleanup(outConfig, 1); - outConfig = NULL; + ARB_FREE_NOT_NULL(outConfig, lofar_udp_io_write_cleanup, 1); // Free our malloc'd objects - CLICleanup(config, outConfig, headerBuffer, intermediateX, intermediateY); + CLICleanup(config, outConfig, fftw, outputStokes); if (silent == 0) { printf("CLI memory cleaned up successfully. Exiting.\n"); } return 0; diff --git a/src/docker/Dockerfile b/src/docker/Dockerfile index 15e36d7e..a1e371e2 100644 --- a/src/docker/Dockerfile +++ b/src/docker/Dockerfile @@ -1,15 +1,13 @@ -FROM ubuntu:latest +FROM ubuntu:24.04 SHELL ["/usr/bin/env", "bash", "-c"] -ENV SOFT /home/soft -RUN mkdir -p $SOFT/udpPacketManager/ +ENV SOFT="/home/soft" +RUN mkdir -p "${SOFT}/udpPacketManager/" -ARG BUILD_CORES=8 +ARG BUILD_CORES="8" ARG OPT_ARCH="native" ARG DEBIAN_FRONTEND=noninteractive -COPY . $SOFT/udpPacketManager/ -RUN find $SOFT/udpPacketManager/src # Get the latest version of in-active-development software @@ -19,21 +17,25 @@ RUN find $SOFT/udpPacketManager/src WORKDIR /home/soft -ENV CC="clang-15" -ENV CXX="clang++-15" +ENV CC_VER="18" +ENV CC="clang-${CC_VER}" +ENV CXX="clang++-${CC_VER}" # Build and install udpPacketManager/mockHeader, riptide, PSRSalsa, CDMT, IQRM, PSRDADA-python WORKDIR $SOFT/udpPacketManager RUN echo "Building udpPacketManager" && \ \ apt-get update && \ - apt-get install -y autoconf ${CC} ${CXX} curl csh git make libomp5-15 libomp-15-dev libtool wget && \ - curl -sSL https://bootstrap.pypa.io/get-pip.py -o get-pip.py && \ - python3 get-pip.py && \ - python3 -m pip install cmake && \ - git clean -f -x && git reset --hard HEAD && \ + apt-get install -y autoconf ${CC} ${CXX} curl csh git make libomp5-${CC_VER} libomp-${CC_VER}-dev libtool python3-pip python3-venv wget && \ + rm -rf /var/lib/apt/lists/* && \ + python3 -m venv /home/soft/venv/ && \ + source /home/soft/venv/bin/activate && \ + python3 -m pip install cmake + +COPY . $SOFT/udpPacketManager/ +RUN source /home/soft/venv/bin/activate && \ + # git clean -f -x && git reset --hard HEAD && \ find ./src && \ - ./build.sh 1 && \ - rm -rf /var/lib/apt/lists/* + ./build.sh 2 WORKDIR /home/user ENTRYPOINT /bin/bash diff --git a/src/docker/Dockerfile_dev b/src/docker/Dockerfile_dev new file mode 100644 index 00000000..bfb93a3b --- /dev/null +++ b/src/docker/Dockerfile_dev @@ -0,0 +1,16 @@ +FROM nvidia/cuda:12.6.1-devel-ubuntu22.04 + +# docker build --platform linux/amd64 --build-arg UID=$(id -u) --build-arg GID=$(id -g) --build-arg NAME=$(whoami) . + +ARG UID=1000 +ARG GID=1000 +ARG NAME=builder +RUN useradd -m -u "${UID}" -g "${GID}" -s /bin/bash "${NAME}" + +RUN apt-get update \ + && apt-get install -y clang clang gdb git libomp-dev libomp5 python3-pip \ + && apt-get install -y git autoconf csh libtool make wget \ + && python3 -m pip install cmake + +USER ${NAME} +RUN git config --global --add safe.directory "*" diff --git a/src/lib/io/lofar_udp_io_DADA.c b/src/lib/io/lofar_udp_io_DADA.c index e514edb0..a1aaa8b5 100644 --- a/src/lib/io/lofar_udp_io_DADA.c +++ b/src/lib/io/lofar_udp_io_DADA.c @@ -277,7 +277,7 @@ int32_t _lofar_udp_io_write_setup_DADA(lofar_udp_io_write_config *const config, multilog_add(config->dadaWriter[outp].multilog, stdout); } - // Initialuse a ringbuffer, followed by a HDU + // Initialise a ringbuffer, followed by a HDU if (config->dadaWriter[outp].hdu == NULL) { // ringbuffer if (_lofar_udp_io_write_setup_DADA_ringbuffer(config->outputDadaKeys[outp], @@ -297,6 +297,10 @@ int32_t _lofar_udp_io_write_setup_DADA(lofar_udp_io_write_config *const config, return -1; } + if (dada_hdu_lock_write(config->dadaWriter[outp].hdu) < 0) { + return -1; + } + if (sprintf(config->outputLocations[outp], "PSRDADA:%d", config->outputDadaKeys[outp]) < 0) { fprintf(stderr, "ERROR: Failed to copy output file path (PSRDADA:%d), exiting.\n", config->outputDadaKeys[outp]); @@ -390,15 +394,17 @@ int64_t _lofar_udp_io_write_DADA(ipcio_t *const ringbuffer, const int8_t *src, c // If performing a data write, if (!ipcbuf) { // Open as the active writer - if (ipcio_open(ringbuffer, 'W') < 0) { - return -1; - } + // Now performed when we create the IO object + // if (ipcio_open(ringbuffer, 'W') < 0) { + // return -1; + // } // Write the data int64_t writtenBytes = ipcio_write(ringbuffer, (char *) src, nchars); // Unlock the buffer and continue - if (ipcio_close(ringbuffer) < 0 || writtenBytes < 0) { + // if (ipcio_close(ringbuffer) < 0 || writtenBytes < 0) { + if (writtenBytes < 0) { return -1; } @@ -407,9 +413,9 @@ int64_t _lofar_udp_io_write_DADA(ipcio_t *const ringbuffer, const int8_t *src, c // If opening as a header, get the buffer and lock as a writer // This is possible as an ipcio_t starts with an ipcbuf_t, followed by extra data ipcbuf_t *buffer = (ipcbuf_t *) ringbuffer; - if (ipcbuf_lock_write(buffer) < 0) { - return -1; - } + // if (ipcbuf_lock_write(buffer) < 0) { + // return -1; + // } // While data in our buffer remains, fill up ringbuffer pages int64_t written = 0; while (nchars) { @@ -428,9 +434,9 @@ int64_t _lofar_udp_io_write_DADA(ipcio_t *const ringbuffer, const int8_t *src, c } // Unlock the buffer and continue - if (ipcbuf_unlock_write(buffer) < 0) { - return -1; - } + // if (ipcbuf_unlock_write(buffer) < 0) { + // return -1; + // } return written; } #else @@ -523,7 +529,7 @@ void _lofar_udp_io_cleanup_DADA_loop(ipcbuf_t *buff, float *timeoutPtr) { float timeout = *timeoutPtr; float totalSleep = 0.001f * (float) sampleEveryNMilli; int64_t iters = 0; - // Hold a reference to the lat read buffer, we may be stuck within 1-2 blocks of the last write, + // Hold a reference to the last read buffer, we may be stuck within 1-2 blocks of the last write, // in which case we will need to force an exit uint64_t previousBuffer = ipcbuf_get_read_count(buff); diff --git a/src/lib/io/lofar_udp_io_HDF5.c b/src/lib/io/lofar_udp_io_HDF5.c index 55262192..b8953118 100644 --- a/src/lib/io/lofar_udp_io_HDF5.c +++ b/src/lib/io/lofar_udp_io_HDF5.c @@ -634,6 +634,11 @@ int64_t _lofar_udp_io_write_metadata_HDF5(lofar_udp_io_write_config *const confi return -1; } + // Reducing string length to keep stack below 4k + char antenna_set_str[shortStrLen / 2]; + char generated_str[shortStrLen / 2]; + strncpy(antenna_set_str, metadata->freq_raw > 100 ? "HBA_JOINED" : "LBA_OUTER", shortStrLen / 2 - 1); + strncpy(generated_str, metadata->upm_reader == DADA_ACTIVE ? "ONLINE" : "OFFLINE", shortStrLen / 2 - 1); const strKeyStrVal rootStrAttrs[] = { { "GROUPTYPE", "Root" }, { "FILETYPE", "bf" }, @@ -654,6 +659,8 @@ int64_t _lofar_udp_io_write_metadata_HDF5(lofar_udp_io_write_config *const confi { "SYSTEM_VERSION", UPM_VERSION }, { "PIPELINE_VERSION", UPM_VERSION }, // TODO? { "BF_VERSION", "NULL" }, // TODO + { "ANTENNA_SET", {*antenna_set_str} }, + { "CREATE_OFFLINE_ONLINE", {*generated_str} }, }; H5G_SET_ATTRS(group, rootStrAttrs, hdf5SetupStrAttrs); @@ -670,10 +677,8 @@ int64_t _lofar_udp_io_write_metadata_HDF5(lofar_udp_io_write_config *const confi { "OBSERVATION_ID", metadata->obs_id }, { "OBSERVATION_START_UTC", metadata->obs_utc_start }, { "EXPTIME_START_UTC", metadata->obs_utc_start }, - { "ANTENNA_SET", metadata->freq_raw > 100 ? "HBA_JOINED" : "LBA_OUTER" }, - { "CREATE_OFFLINE_ONLINE", metadata->upm_reader == DADA_ACTIVE ? "ONLINE" : "OFFLINE" }, { "TARGET", metadata->source }, - { "FILTER_SELECTION", filterSelection }, + { "FILTER_SELECTION", &(filterSelection[0]) }, }; H5G_SET_ATTRS(group, rootStrPtrAttrs, hdf5SetupStrPtrAttrs); @@ -1128,7 +1133,7 @@ int64_t _lofar_udp_io_write_metadata_HDF5(lofar_udp_io_write_config *const confi hsize_t maxdims[2] = { H5S_UNLIMITED, H5S_UNLIMITED }; hsize_t chunk_dims[2] = { 4096, 32 }; H5_ERR_CHECK(status, H5Pset_chunk(prop, rank, chunk_dims)); - char dsetName[DEF_STR_LEN], componentStr[16] = ""; + char componentStr[16] = ""; const char delim = '-'; char *component = NULL; @@ -1166,6 +1171,7 @@ int64_t _lofar_udp_io_write_metadata_HDF5(lofar_udp_io_write_config *const confi config->hdf5Writer.hdf5DSetWriter[outputs].dims[1] = metadata->nchan; H5_ERR_CHECK(dataspace, H5Screate_simple(rank, config->hdf5Writer.hdf5DSetWriter->dims, maxdims)); + char dsetName[DEF_STR_LEN] = ""; if (snprintf(dsetName, DEF_STR_LEN - 1, "/SUB_ARRAY_POINTING_000/BEAM_000/STOKES_%d", i) < 1) { fprintf(stderr, "ERROR: Failed to print dataset name for dset %d, exiting.\n", i); return -1; diff --git a/src/lib/io/lofar_udp_io_ZSTD.c b/src/lib/io/lofar_udp_io_ZSTD.c index 717f8bfc..a0d7fc8f 100644 --- a/src/lib/io/lofar_udp_io_ZSTD.c +++ b/src/lib/io/lofar_udp_io_ZSTD.c @@ -46,7 +46,7 @@ _lofar_udp_io_read_setup_ZSTD(lofar_udp_io_read_config *const input, const char return -2; } - if (madvise(tmpPtr, fileSize, MADV_SEQUENTIAL) == -1) { + if (madvise(tmpPtr, fileSize, MADV_SEQUENTIAL | MADV_WILLNEED) == -1) { fprintf(stderr, "ERROR: Failed to advise the kernel on mmap read strategy on port %d. Errno: %d. Exiting.\n", port, errno); return -3; @@ -56,14 +56,14 @@ _lofar_udp_io_read_setup_ZSTD(lofar_udp_io_read_config *const input, const char // Setup the decompressed data buffer/struct // If the size is pre-set, we have already resize the true buffer as required int64_t bufferSize = input->decompressionTracker[port].size != 0 ? - (int64_t) input->decompressionTracker[port].size : input->readBufSize[port]; + (int64_t) input->decompressionTracker[port].size : input->readBufSize[port]; if (bufferSize % ZSTD_DStreamOutSize() && !(input->readerType == ZSTDCOMPRESSED_INDIRECT)) { fprintf(stderr, "ERROR %s: Zstandard buffer was not initialised correctly, exiting.\n", __func__); return -1; } else { bufferSize += _lofar_udp_io_read_ZSTD_fix_buffer_size(bufferSize, 1); VERBOSE(printf("reader_setup: expanding decompression buffer by %ld bytes\n", - bufferSize - input->readBufSize[port])); + bufferSize - input->readBufSize[port])); } input->decompressionTracker[port].pos = 0; // Initialisation for our step-by-step reader @@ -104,20 +104,23 @@ int64_t _lofar_udp_io_read_ZSTD(lofar_udp_io_read_config *const input, int8_t po size_t returnVal; // Move the trailing end of the buffer to the start of the target read - int8_t *dest = input->decompressionTracker[port].dst; + const int8_t *dest = input->decompressionTracker[port].dst; dataRead = input->decompressionTracker[port].pos - input->zstdLastRead[port]; + // Copy the preread position to optimise madvise calls later + const size_t previousReadPos = input->readingTracker[port].pos; + VERBOSE(printf("reader_nchars %d: start of ZSTD read loop, %ld, %ld, %ld, %ld, %ld\n", port, input->readingTracker[port].pos, - input->readingTracker[port].size, + input->readingTracker[port].size, input->decompressionTracker[port].pos, dataRead, nchars);); if (input->readerType == ZSTDCOMPRESSED) { dest = targetArray; - int64_t ptrByteDifference = targetArray - (int8_t *) input->decompressionTracker[port].dst; + const int64_t ptrByteDifference = targetArray - (int8_t *) input->decompressionTracker[port].dst; if (ptrByteDifference < 0 || ptrByteDifference > (int64_t) input->decompressionTracker[port].size) { fprintf(stderr, "ERROR %s: Passed buffer is not within pre-set buffer range (delta %ld), exiting.\n", __func__, ptrByteDifference); return -1; @@ -125,7 +128,7 @@ int64_t _lofar_udp_io_read_ZSTD(lofar_udp_io_read_config *const input, int8_t po } // memmove as we can't use memcpy for the ZSTANDARD moe due to potential overlapping buffer components - if (memmove(dest, &(((int8_t *) input->decompressionTracker[port].dst)[input->zstdLastRead[port]]), dataRead) != dest) { + if (memmove((int8_t *) dest, &(((int8_t *) input->decompressionTracker[port].dst)[input->zstdLastRead[port]]), dataRead) != dest) { fprintf(stderr, "ERROR: Failed to copy end of ZSTD buffer, exiting.\n"); return -1; } @@ -149,10 +152,10 @@ int64_t _lofar_udp_io_read_ZSTD(lofar_udp_io_read_config *const input, int8_t po previousDecompressionPos = input->decompressionTracker[port].pos; // zstd streaming decompression + check for errors returnVal = ZSTD_decompressStream(input->dstream[port], &(input->decompressionTracker[port]), - &(input->readingTracker[port])); + &(input->readingTracker[port])); if (ZSTD_isError(returnVal)) { fprintf(stderr, "ZSTD encountered an error decompressing a frame (code %ld, %s), exiting data read early.\n", - returnVal, ZSTD_getErrorName(returnVal)); + returnVal, ZSTD_getErrorName(returnVal)); break; } @@ -168,8 +171,8 @@ int64_t _lofar_udp_io_read_ZSTD(lofar_udp_io_read_config *const input, int8_t po if (input->decompressionTracker[port].pos == input->decompressionTracker[port].size) { fprintf(stderr, - "Failed to read %ld/%ld chars on port %d before filling the buffer. Attempting to continue...\n", - dataRead, nchars, port); + "Failed to read %ld/%ld chars on port %d before filling the buffer. Attempting to continue...\n", + dataRead, nchars, port); break; } } @@ -183,17 +186,37 @@ int64_t _lofar_udp_io_read_ZSTD(lofar_udp_io_read_config *const input, int8_t po } // Completed or EOF: unmap used memory and return everything we read - // TODO: MADV_DONTNEED causes significant slow down for multi-hour observations, test performance of MADV_FREE - // TODO Might this be because we're applying to the full range, rather than just the block we just read? - // TODO: Commenting out this entire block, the memory is marked as sequential, so in theory this is not required at all + // TODO: MADV_DONTNEED causes significant slow down for multi-hour observations, test performance of MADV_FREE vs MADV_DONTNEED + // Previously commented out at it wasn't thought the be needed, unfortunately it is. + // Large observations were holding > 50% of a systems memory, as compared to < 3GB in 0.6 builds. //if (madvise(((void *) input->readingTracker[port].src), input->readingTracker[port].pos, MADV_DONTNEED) < 0) { - /* - if (madvise(((void *) input->readingTracker[port].src), input->readingTracker[port].pos, MADV_FREE) < 0) { - fprintf(stderr, - "ERROR: Failed to apply MADV_DONTNEED after read operation on port %d (errno %d: %s).\n", port, - errno, strerror(errno)); + if (previousReadPos < input->readingTracker[port].pos) { + const size_t pageSize = getpagesize(); + const size_t previousReadMadviseOffset = previousReadPos - (previousReadPos % pageSize); + void *startAddress = ((void *) input->readingTracker[port].src) + previousReadMadviseOffset; + size_t memoryPages = (input->readingTracker[port].pos - previousReadMadviseOffset) / pageSize; + + // Don't try to apply madvise to too much data + if ((previousReadMadviseOffset + memoryPages * pageSize) > input->readingTracker[port].size) { + memoryPages -= 1; + } + + if (memoryPages > 0) { + // madvise calls must be page aligned, determine the offsets to apply to all of the current page at the start, but not the end of the page at the end + if (madvise((void *) startAddress, memoryPages * pageSize, input->zstdMadvise) < 0) { + fprintf(stderr, + "WARNING: Failed to apply %d after read operation on port %d (errno %d: %s).\n", input->zstdMadvise, port, + errno, strerror(errno)); + + if (madvise(((void *) input->readingTracker[port].src) + previousReadPos, input->readingTracker[port].pos - previousReadPos, + input->zstdMadvise) < + 0) { + fprintf(stderr, "ERROR: Fallback madvise %d on full range failed to apply, exiting.\n", input->zstdMadvise); + return -1; + } + } + } } - */ input->zstdLastRead[port] = (dest - (int8_t*) input->decompressionTracker[port].dst) + nchars; // Copy data for the indirect reader @@ -257,7 +280,7 @@ void _lofar_udp_io_read_cleanup_ZSTD(lofar_udp_io_read_config *const input, cons * @return >0: bytes read, <=-1: Failure */ int64_t _lofar_udp_io_read_temp_ZSTD(void *outbuf, const int64_t size, const int64_t num, const char inputFile[], - const int8_t resetSeek) { + const int8_t resetSeek) { VERBOSE(printf("%s: %s, %ld\n", __func__, inputFile, strlen(inputFile))); // Open the underlying file FILE *inputFilePtr = fopen(inputFile, "rb"); @@ -434,7 +457,7 @@ _lofar_udp_io_write_ZSTD(lofar_udp_io_write_config *const config, const int8_t o while((returnVal = ZSTD_compressStream2(config->zstdWriter[outp].cstream, &output, &input, ZSTD_e_continue))) { if (ZSTD_isError(returnVal)) { fprintf(stderr, "ERROR: Failed to compressed data with ZSTD (%ld, %s)\n", returnVal, - ZSTD_getErrorName(returnVal)); + ZSTD_getErrorName(returnVal)); return -1; } } @@ -444,7 +467,7 @@ _lofar_udp_io_write_ZSTD(lofar_udp_io_write_config *const config, const int8_t o while((returnVal = ZSTD_compressStream2(config->zstdWriter[outp].cstream, &output, &input, ZSTD_e_flush))) { if (ZSTD_isError(returnVal)) { fprintf(stderr, "ERROR: Failed to finish frame for ZSTD (%ld, %s)\n", returnVal, - ZSTD_getErrorName(returnVal)); + ZSTD_getErrorName(returnVal)); return -1; } } @@ -477,7 +500,7 @@ void _lofar_udp_io_write_cleanup_ZSTD(lofar_udp_io_write_config *const config, c if (fullClean && config->zstdWriter[outp].cstream != NULL) { ZSTD_freeCStream(config->zstdWriter[outp].cstream); config->zstdWriter[outp].cstream = NULL; - + if (config->zstdWriter[outp].compressionBuffer.dst != NULL) { FREE_NOT_NULL(config->zstdWriter[outp].compressionBuffer.dst); } diff --git a/src/lib/lofar_udp_backends.cpp b/src/lib/lofar_udp_backends.cpp index 7259cb62..5ba8ec13 100644 --- a/src/lib/lofar_udp_backends.cpp +++ b/src/lib/lofar_udp_backends.cpp @@ -308,8 +308,46 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { case STOKES_IV_DS16_TIME: return lofar_udp_raw_loop(meta); + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } @@ -595,8 +633,47 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { return lofar_udp_raw_loop(meta); + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } @@ -879,14 +956,52 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { case STOKES_IV_DS16_TIME: return lofar_udp_raw_loop(meta); + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } default: - fprintf(stderr, "Unexpected bitmode %d (%d, %d). Exiting.\n", inputBitMode, processingMode, + fprintf(stderr, "Unexpected bitmode %d (processing mode %d, calibration %d). Exiting.\n", inputBitMode, processingMode, calibrateData); return 2; } @@ -1178,8 +1293,47 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { case STOKES_IV_DS16_TIME: return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } @@ -1465,8 +1619,47 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { case STOKES_IV_DS16_TIME: return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } @@ -1753,14 +1946,53 @@ int32_t lofar_udp_cpp_loop_interface(lofar_udp_obs_meta *meta) { case STOKES_IV_DS16_TIME: return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY: + return lofar_udp_raw_loop(meta); + case POWER_XY_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_TIME: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_REV: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_REV: + return lofar_udp_raw_loop(meta); + + // Power X / Y + case POWER_XY_DS2_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS4_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS8_TIME: + return lofar_udp_raw_loop(meta); + case POWER_XY_DS16_TIME: + return lofar_udp_raw_loop(meta); + default: - fprintf(stderr, "Unknown processing mode %d (%d, %d). Exiting.\n", processingMode, + fprintf(stderr, "Unknown processing mode %d (bitmode %d, calibration %d). Exiting.\n", processingMode, inputBitMode, calibrateData); return 2; } default: - fprintf(stderr, "Unexpected bitmode %d (%d, %d). Exiting.\n", inputBitMode, processingMode, + fprintf(stderr, "Unexpected bitmode %d (procmode %d, calibration %d). Exiting.\n", inputBitMode, processingMode, calibrateData); return 2; } diff --git a/src/lib/lofar_udp_backends.hpp b/src/lib/lofar_udp_backends.hpp index e3c4c71f..9ed78094 100644 --- a/src/lib/lofar_udp_backends.hpp +++ b/src/lib/lofar_udp_backends.hpp @@ -33,12 +33,13 @@ extern const int8_t bitmodeConversion[256][2]; // Declare the function to calculate a calibrated sample // Calculates one output component of: -// [(x_1, y_1i), (x_2, y_2i)] . [(s_1, s_2i)] = J . (X,Y).T -// [(x_3, y_3i), (x_4, y_4i)] [(s_3, s_4i)] +// [(x_1 + y_1i), (x_2 + y_2i)] . [(s_1, s_2i)] = J . (X,Y).T +// [(x_3 + y_3i), (x_4 + y_4i)] [(s_3, s_4i)] // // === // [(x_1 * s_1 - y_1 * s_2 + x_2 * s_3 - y_2 * s_4), i (x_1 * s_2 + y_1 * s_1 + x_2 * s_4 + y_2 * s_3)] = (X_r, X_i) // [(x_3 * s_1 - y_3 * s_2 + x_4 * s_3 - y_4 * s_4), i (x_3 * s_2 + y_3 * s_1 + x_4 * s_4 + y_4 * s_3)] = (Y_r, Y_i) + #pragma omp declare simd static inline float calibrateSample(float c_1, float c_2, float c_3, float c_4, float c_5, float c_6, float c_7, float c_8) { return (c_1 * c_2) + (c_3 * c_4) + (c_5 * c_6) + (c_7 * c_8); @@ -65,6 +66,11 @@ static inline float stokesV(float Xr, float Xi, float Yr, float Yi) { return 2.0f * ((Xr * Yi) - (Xi * Yr)); } +#pragma omp declare simd +static inline float polPower(float r, float i) { + return (r * r) + (i * i); +} + #ifdef __cplusplus } @@ -763,7 +769,7 @@ static inline void udp_fullStokesDecimation(int64_t iLoop, const int8_t *inputPo } } -// Stokes I/Q Parameter output, in a given order (varies wildly based on input parameters) +// Stokes I/V Parameter output, in a given order (varies wildly based on input parameters) template static inline void udp_usefulStokes(int64_t iLoop, const int8_t *inputPortData, O **outputData, int64_t lastInputPacketOffset, int64_t packetOutputLength, @@ -806,7 +812,7 @@ udp_usefulStokes(int64_t iLoop, const int8_t *inputPortData, O **outputData, int } } -// Stokes I/Q Parameter output, in a given order (varies wildly based on input parameters), with downsampling by a factor up to 16 +// Stokes I/V Parameter output, in a given order (varies wildly based on input parameters), with downsampling by a factor up to 16 template static inline void udp_usefulStokesDecimation(int64_t iLoop, const int8_t *inputPortData, O **outputData, int64_t lastInputPacketOffset, int64_t packetOutputLength, int32_t timeStepSize, int32_t totalBeamlets, int32_t upperBeamlet, @@ -860,6 +866,95 @@ static inline void udp_usefulStokesDecimation(int64_t iLoop, const int8_t *input } +// Power X/Y Parameter output, in a given order (varies wildly based on input parameters) +template +static inline void +udp_splitPower(int64_t iLoop, const int8_t *inputPortData, O **outputData, int64_t lastInputPacketOffset, int64_t packetOutputLength, + int32_t timeStepSize, int32_t totalBeamlets, int32_t upperBeamlet, int32_t cumulativeBeamlets, + int64_t packetsPerIteration, int32_t baseBeamlet, const float *jonesMatrix) { + + const int64_t outputPacketOffset = outputPacketOffsetCalc(iLoop, packetOutputLength); + + O Xr[UDPNTIMESLICE], Xi[UDPNTIMESLICE], Yr[UDPNTIMESLICE], Yi[UDPNTIMESLICE]; + const float *beamletJones; + + + for (int32_t beamlet = baseBeamlet; beamlet < upperBeamlet; beamlet++) { + const int64_t tsInOffsetBase = input_offset_index(lastInputPacketOffset, beamlet, timeStepSize); + const int64_t tsOutOffsetBase = outputTsOffsetBaseCalc(outputPacketOffset, totalBeamlets, beamlet, baseBeamlet, cumulativeBeamlets, packetsPerIteration); + + TYPE_AND_CAL_SETUP(I, O); + + FALLBACK_LOOP_OPTIMISATION + for (int32_t ts = 0; ts < UDPNTIMESLICE; ts++) { + const int64_t tsInOffset = ts * UDPNPOL; + const int64_t tsOutOffset = outputTsOffsetCalc(tsOutOffsetBase, ts, totalBeamlets); + + if constexpr (calibrateData) { + calibrateDataFunc(&Xr[ts], &Xi[ts], &Yr[ts], &Yi[ts], beamletJones, castPtr, tsInOffset); + + outputData[0][tsOutOffset] = polPower(Xr[ts], Xi[ts]); + outputData[1][tsOutOffset] = polPower(Yr[ts], Yi[ts]); + } else { + outputData[0][tsOutOffset] = polPower(castPtr[tsInOffset], + castPtr[tsInOffset + 1]); + outputData[1][tsOutOffset] = polPower(castPtr[tsInOffset + 2], + castPtr[tsInOffset + 3]); + } + } + } +} + +// Power X/Y Parameter output, in a given order (varies wildly based on input parameters), with downsampling by a factor up to 16 +template +static inline void udp_splitPowerDecimation(int64_t iLoop, const int8_t *inputPortData, O **outputData, int64_t lastInputPacketOffset, + int64_t packetOutputLength, int32_t timeStepSize, int32_t totalBeamlets, int32_t upperBeamlet, + int32_t cumulativeBeamlets, int64_t packetsPerIteration, int32_t baseBeamlet, + const float *jonesMatrix) { + + const int64_t outputPacketOffset = outputPacketOffsetCalc(iLoop, packetOutputLength); + + O tempValX, tempValY; + O Xr[UDPNTIMESLICE], Xi[UDPNTIMESLICE], Yr[UDPNTIMESLICE], Yi[UDPNTIMESLICE]; + const float *beamletJones; + + for (int32_t beamlet = baseBeamlet; beamlet < upperBeamlet; beamlet++) { + const int64_t tsInOffsetBase = input_offset_index(lastInputPacketOffset, beamlet, timeStepSize); + const int64_t tsOutOffsetBase = outputTsOffsetBaseCalc(outputPacketOffset, totalBeamlets, beamlet, baseBeamlet, cumulativeBeamlets, packetsPerIteration); + + TYPE_AND_CAL_SETUP(I, O); + + tempValX = 0.0f; + tempValY = 0.0f; + + FALLBACK_LOOP_OPTIMISATION + for (int32_t ts = 0; ts < static_cast(UDPNTIMESLICE / factor); ts++) { + const int64_t tsOutOffset = outputTsOffsetCalc(tsOutOffsetBase, ts, totalBeamlets); + for (int32_t tss = 0; tss < factor; tss++) { + const int32_t calIdx = (tss + ts * factor); + const int64_t tsInOffset = calIdx * UDPNPOL; + + if constexpr (calibrateData) { + calibrateDataFunc(&Xr[calIdx], &Xi[calIdx], &Yr[calIdx], &Yi[calIdx], beamletJones, castPtr, tsInOffset); + + tempValX += polPower(Xr[calIdx], Xi[calIdx]); + tempValY += polPower(Yr[calIdx], Yi[calIdx]); + } else { + tempValX += polPower(castPtr[tsInOffset], + castPtr[tsInOffset + 1]); + tempValY += polPower(castPtr[tsInOffset + 2], + castPtr[tsInOffset + 3]); + } + } + outputData[0][tsOutOffset] = tempValX; + outputData[1][tsOutOffset] = tempValY; + tempValX = 0.0f; + tempValY = 0.0f; + } + } +} + + // Define the main processing loop template @@ -881,7 +976,7 @@ int32_t lofar_udp_raw_loop(lofar_udp_obs_meta *meta) { int32_t packetLoss = 0; // Get number of OpenMP Threads - const int32_t nThreads = omp_get_num_threads(); + const int32_t nThreads = omp_get_max_threads(); VERBOSE(const int32_t verbose = meta->VERBOSE); // Calculate the true processing mode (4-bit -> +4000) @@ -1467,6 +1562,55 @@ int32_t lofar_udp_raw_loop(lofar_udp_obs_meta *meta) { cumulativeBeamlets, outputPacketsPerIteration, baseBeamlet, jonesMatrix); + } else if constexpr (trueState == POWER_XY) { + + udp_splitPower(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); + } else if constexpr (trueState == POWER_XY_REV) { + udp_splitPower(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); + } else if constexpr (trueState == POWER_XY_TIME) { + udp_splitPower(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); + } else if constexpr (trueState >= POWER_XY_DS2 && trueState <= POWER_XY_DS16) { + udp_splitPowerDecimation(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); + } else if constexpr (trueState >= POWER_XY_DS2_REV && trueState <= POWER_XY_DS16_REV) { + udp_splitPowerDecimation(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); + } else if constexpr (trueState >= POWER_XY_DS2_TIME && trueState <= POWER_XY_DS16_TIME) { + udp_splitPowerDecimation(iLoop, inputPortData, outputData, + lastInputPacketOffset, + packetOutputLength, timeStepSize, + totalBeamlets, upperBeamlet, + cumulativeBeamlets, + outputPacketsPerIteration, baseBeamlet, + jonesMatrix); } else { fprintf(stderr, "Unknown processing mode %d, exiting.\n", state); diff --git a/src/lib/lofar_udp_general.h.in b/src/lib/lofar_udp_general.h.in index 000bb1ef..f0a743d7 100644 --- a/src/lib/lofar_udp_general.h.in +++ b/src/lib/lofar_udp_general.h.in @@ -149,6 +149,25 @@ typedef enum { STOKES_V_DS16_TIME = 334, STOKES_IQUV_DS16_TIME = 354, STOKES_IV_DS16_TIME = 364, + + POWER_XY = 170, + POWER_XY_REV = 270, + POWER_XY_TIME = 370, + + POWER_XY_DS2 = 171, + POWER_XY_DS4 = 172, + POWER_XY_DS8 = 173, + POWER_XY_DS16 = 174, + + POWER_XY_DS2_REV = 271, + POWER_XY_DS4_REV = 272, + POWER_XY_DS8_REV = 273, + POWER_XY_DS16_REV = 274, + + POWER_XY_DS2_TIME = 371, + POWER_XY_DS4_TIME = 372, + POWER_XY_DS8_TIME = 373, + POWER_XY_DS16_TIME = 374, } processMode_t; // Internal references to output data's rough ordering @@ -348,7 +367,8 @@ typedef struct __attribute__((__packed__)) lofar_source_bytes { (sizeof((arrayName)) / sizeof((arrayName)[0])) // Free if non-null macro -#define FREE_NOT_NULL(x) if ((x) != NULL) { free((x)); (x) = NULL; } +#define ARB_FREE_NOT_NULL(x, func, ...) if ((x) != NULL) { func((x), ##__VA_ARGS__); (x) = NULL; } +#define FREE_NOT_NULL(x) ARB_FREE_NOT_NULL((x), free) // Silence warnings about parameters not used in certain constexpr paths // https://stackoverflow.com/a/1486931 diff --git a/src/lib/lofar_udp_io.c b/src/lib/lofar_udp_io.c index ea9b76a6..7f34d42e 100644 --- a/src/lib/lofar_udp_io.c +++ b/src/lib/lofar_udp_io.c @@ -342,6 +342,39 @@ int32_t _lofar_udp_io_write_internal_lib_setup_helper(lofar_udp_io_write_config return lofar_udp_io_write_setup_helper(config, fakeOutputLength, reader->meta->numOutputs, iter, reader->meta->lastPacket); } +/** + * @brief Setup a writer based on the processing information for the current meta struct + * + * This could be a replacement for the above call, but it is left in for backwards compatibility + * + * @param config Writer config + * @param meta Configured obs_meta struct + * @param iter Current output iteration + * + * @return 0: Success, -1: Failure + */ +int32_t _lofar_udp_io_write_internal_meta_setup_helper(lofar_udp_io_write_config *config, lofar_udp_obs_meta *meta, int32_t iter) { + if (config == NULL || meta == NULL) { + fprintf(stderr, "ERROR %s: passed null input configuration (config: %p, meta: %p), exiting.\n", __func__, config, meta); + return -1; + } + + if (meta->packetsPerIteration < 1) { + fprintf(stderr, "ERROR %s: Input packetsPerIteration is not initialised (%ld), exiting.\n", __func__, meta->packetsPerIteration); + return -2; + } + for (int8_t outp = 0; outp < config->numOutputs; outp++) { + if (meta->packetOutputLength[outp] < 1) { + fprintf(stderr, "ERROR %s: packetOutputLength[%d] is not initialised (%d), exiting.\n", __func__, outp, meta->packetOutputLength[outp]); + return -3; + } + config->writeBufSize[outp] = meta->packetsPerIteration * meta->packetOutputLength[outp]; + } + int64_t fakeOutputLength[1] = { LONG_MIN }; + + return lofar_udp_io_write_setup_helper(config, fakeOutputLength, meta->numOutputs, iter, meta->lastPacket); +} + // Cleanup functions @@ -488,7 +521,7 @@ reader_t lofar_udp_io_parse_type_optarg(const char *optargc, char *fileFormat, i VERBOSE(printf("%s, HDF5\n", optargc)); reader = HDF5; } else { - fprintf(stderr, "WARNING %s: No filename hints found, assuming input is a normal file.\n", __func__); + fprintf(stderr, "No filename hints found, assuming path %s is to a normal file.\n", optargc); reader = NORMAL; } @@ -647,7 +680,7 @@ int32_t lofar_udp_io_parse_format(char *dest, const char format[], int32_t port, } } - int32_t returnBool = (dest != strncpy(dest, formatCopySrc, DEF_STR_LEN)) || notrigger; + const int32_t returnBool = (dest != strncpy(dest, formatCopySrc, DEF_STR_LEN)) || notrigger; FREE_NOT_NULL(formatCopyOne); FREE_NOT_NULL(formatCopyTwo); FREE_NOT_NULL(prefix); FREE_NOT_NULL(suffix); return returnBool > 0 ? -1 : 0; diff --git a/src/lib/lofar_udp_io.h b/src/lib/lofar_udp_io.h index 9fae8dab..7720588e 100644 --- a/src/lib/lofar_udp_io.h +++ b/src/lib/lofar_udp_io.h @@ -50,6 +50,7 @@ void lofar_udp_io_write_cleanup(lofar_udp_io_write_config *config, int8_t fullCl int32_t _lofar_udp_io_read_setup_internal_lib_helper(lofar_udp_io_read_config *const input, const lofar_udp_config *config, lofar_udp_obs_meta *meta, int8_t port); int32_t _lofar_udp_io_write_internal_lib_setup_helper(lofar_udp_io_write_config *config, lofar_udp_reader *reader, int32_t iter); +int32_t _lofar_udp_io_write_internal_meta_setup_helper(lofar_udp_io_write_config *config, lofar_udp_obs_meta *meta, int32_t iter); int32_t _lofar_udp_io_read_setup_FILE(lofar_udp_io_read_config *const input, const char *inputLocation, int8_t port); int32_t diff --git a/src/lib/lofar_udp_metadata.c b/src/lib/lofar_udp_metadata.c index 9866c0bf..9be8f731 100644 --- a/src/lib/lofar_udp_metadata.c +++ b/src/lib/lofar_udp_metadata.c @@ -99,6 +99,10 @@ int32_t lofar_udp_metadata_setup(lofar_udp_metadata *const metadata, const lofar return -1; } + return _lofar_udp_metadata_setup_types(metadata); +} + +int32_t _lofar_udp_metadata_setup_types(lofar_udp_metadata *const metadata) { // If the output format isn't PSRDADA or HDF5, setup the target struct const processMode_t guppiMode = TIME_MAJOR_FULL; const processMode_t stokesMin = STOKES_I; @@ -717,7 +721,7 @@ int32_t _lofar_udp_metadata_parse_reader(lofar_udp_metadata *const metadata, con if (warning != dataIncreasing) { fprintf(stderr, "WARNING %s: Input metadata is not monotonically increasing or decreasing (as comparing beamlets (%hd and %hd). Metadata result may be incorrect.\n", - __func__, beamlet, ((int16_t) beamlet - 1)); + __func__, beamlet, (int16_t) (beamlet - 1)); warning = 0; } else { warning = 1; diff --git a/src/lib/lofar_udp_metadata.h b/src/lib/lofar_udp_metadata.h index 144fa36d..0a3d2245 100644 --- a/src/lib/lofar_udp_metadata.h +++ b/src/lib/lofar_udp_metadata.h @@ -45,6 +45,7 @@ int64_t lofar_udp_metadata_write_file(const lofar_udp_reader *reader, lofar_udp_ int32_t _lofar_udp_metadata_get_station_name(int stationID, char *stationCode); //int lofar_udp_metadata_setup_DADA(lofar_udp_metadata *metadata); // Main setup call populates the DADA struct +int32_t _lofar_udp_metadata_setup_types(lofar_udp_metadata *metadata); int32_t _lofar_udp_metadata_setup_GUPPI(lofar_udp_metadata *metadata); int32_t _lofar_udp_metadata_setup_SIGPROC(lofar_udp_metadata *metadata); int32_t _lofar_udp_metadata_setup_DADA(__attribute__((unused)) lofar_udp_metadata *metadata); diff --git a/src/lib/lofar_udp_reader.c b/src/lib/lofar_udp_reader.c index b7ef690b..79daae84 100644 --- a/src/lib/lofar_udp_reader.c +++ b/src/lib/lofar_udp_reader.c @@ -768,6 +768,8 @@ int32_t _lofar_udp_setup_processing(lofar_udp_obs_meta *meta) { case STOKES_V_REV ... STOKES_V_DS16_REV: case STOKES_IQUV_REV ... STOKES_IQUV_DS16_REV: case STOKES_IV_REV ... STOKES_IV_DS16_REV: + case POWER_XY ... POWER_XY_DS16: + case POWER_XY_REV ... POWER_XY_DS16_REV: meta->dataOrder = FREQUENCY_MAJOR; break; @@ -779,6 +781,7 @@ int32_t _lofar_udp_setup_processing(lofar_udp_obs_meta *meta) { case STOKES_V_TIME ... STOKES_V_DS16_TIME: case STOKES_IQUV_TIME ... STOKES_IQUV_DS16_TIME: case STOKES_IV_TIME ... STOKES_IV_DS16_TIME: + case POWER_XY_TIME ... POWER_XY_DS16_TIME: meta->dataOrder = TIME_MAJOR; break; @@ -875,6 +878,9 @@ int32_t _lofar_udp_setup_processing(lofar_udp_obs_meta *meta) { case STOKES_IV ... STOKES_IV_DS16: case STOKES_IV_REV ... STOKES_IV_DS16_REV: case STOKES_IV_TIME ... STOKES_IV_DS16_TIME: + case POWER_XY ... POWER_XY_DS16: + case POWER_XY_REV ... POWER_XY_DS16_REV: + case POWER_XY_TIME ... POWER_XY_DS16_TIME: meta->numOutputs = 2; meta->outputBitMode = 32; // 4 input words -> 2 larger word @@ -961,12 +967,19 @@ int32_t _lofar_udp_reader_config_check(const lofar_udp_config *config) { } for (int8_t port = 0; port < config->numPorts; port++) { - if (!strlen(config->inputLocations[port])) { - fprintf(stderr, "ERROR: You requested %d ports, but port %d is an empty string, exiting.\n", config->numPorts, port); - return -1; - } else if (access(config->inputLocations[port], F_OK) != 0) { - fprintf(stderr, "ERROR: Failed to open file at %s (port %d), exiting.\n", config->inputLocations[port], port); - return -1; + if (!(config->readerType & DADA_ACTIVE)) { + if (!strlen(config->inputLocations[port])) { + fprintf(stderr, "ERROR: You requested %d ports, but port %d is an empty string, exiting.\n", config->numPorts, port); + return -1; + } else if (access(config->inputLocations[port], F_OK) != 0) { + fprintf(stderr, "ERROR: Failed to open file at %s (port %d), exiting.\n", config->inputLocations[port], port); + return -1; + } + } else { + if (config->inputDadaKeys[port] < 0) { + fprintf(stderr, "ERROR: You requested %d ports, but the DADA key for port %d is unset, exiting.\n", config->numPorts, config->inputDadaKeys[port]); + return -1; + } } } @@ -1453,6 +1466,7 @@ int32_t lofar_udp_reader_step_timed(lofar_udp_reader *reader, double timing[2]) // outputDataReady is negative -> out of order packets -> need to re-run for time-major modes if (reader->meta->outputDataReady < 0 && reader->meta->dataOrder == TIME_MAJOR) { fprintf(stderr, "WARNING: Re-running processing due to out of order packets and time-major processing mode.\n"); + // TODO: Recover packet order, drop packets + re-fill buffer as needed. stepReturnVal = lofar_udp_cpp_loop_interface(reader->meta); } else { return stepReturnVal; diff --git a/src/lib/lofar_udp_structs.c b/src/lib/lofar_udp_structs.c index 18cb262b..dec3c915 100644 --- a/src/lib/lofar_udp_structs.c +++ b/src/lib/lofar_udp_structs.c @@ -28,6 +28,7 @@ const lofar_udp_io_read_config lofar_udp_io_read_config_default = { .readingTracker = { { NULL, 0, 0 } }, // NEEDS FULL RUNTIME INITIALISATION .decompressionTracker = { { NULL, 0, 0 } }, // NEEDS FULL RUNTIME INITIALISATION .zstdLastRead = { 0 }, // NEEDS FULL RUNTIME INITIALISATION + .zstdMadvise = MADV_DONTNEED, .multilog = { NULL }, // NEEDS FULL RUNTIME INITIALISATION .dadaPageSize = { -1 } // NEEDS FULL RUNTIME INITIALISATION }; diff --git a/src/lib/lofar_udp_structs.h b/src/lib/lofar_udp_structs.h index 52203c2f..a56a3be2 100644 --- a/src/lib/lofar_udp_structs.h +++ b/src/lib/lofar_udp_structs.h @@ -17,6 +17,10 @@ #include #endif // End of NODADA + +// Needed for zstdMadvise initialisation +#include + // PSRDADA include may not be available #ifndef DADA_INCLUDES #define DADA_INCLUDES @@ -67,6 +71,7 @@ typedef struct lofar_udp_io_read_config { ZSTD_inBuffer readingTracker[MAX_NUM_PORTS]; ZSTD_outBuffer decompressionTracker[MAX_NUM_PORTS]; int64_t zstdLastRead[MAX_NUM_PORTS]; + int zstdMadvise; // PSRDADA requirements multilog_t *multilog[MAX_NUM_PORTS]; diff --git a/src/lib/lofar_udp_structs_metadata.c b/src/lib/lofar_udp_structs_metadata.c index aae86d6e..81c3a4d4 100644 --- a/src/lib/lofar_udp_structs_metadata.c +++ b/src/lib/lofar_udp_structs_metadata.c @@ -195,6 +195,7 @@ void lofar_udp_metadata_cleanup(lofar_udp_metadata *meta) { } FREE_NOT_NULL(meta->output.guppi); } + FREE_NOT_NULL(meta->headerBuffer); FREE_NOT_NULL(meta); } diff --git a/src/lib/metadata/lofar_udp_metadata_SIGPROC.c b/src/lib/metadata/lofar_udp_metadata_SIGPROC.c index ab35cccd..912ddb3d 100644 --- a/src/lib/metadata/lofar_udp_metadata_SIGPROC.c +++ b/src/lib/metadata/lofar_udp_metadata_SIGPROC.c @@ -39,7 +39,7 @@ int32_t _lofar_udp_metadata_setup_SIGPROC(lofar_udp_metadata *metadata) { // Constants for what we produce // TODO: Modify if we want support for more than just single(-Stokes) outputs - metadata->output.sigproc->nifs = 1; + metadata->output.sigproc->nifs = metadata->ndim * metadata->npol; metadata->output.sigproc->data_type = 1; // Sigproc pointing variables take the form of a double, which is parsed as ddmmss.ss diff --git a/src/misc/bitshuffleMakefile b/src/misc/bitshuffleMakefile index 775d67a0..3808a731 100644 --- a/src/misc/bitshuffleMakefile +++ b/src/misc/bitshuffleMakefile @@ -2,8 +2,8 @@ CMAKE_BASE_DIR ?= ../../../ CFLAGS += -O3 -ffast-math -std=c99 -static -fno-strict-aliasing -fPIC -DZSTD_SUPPORT -CFLAGS += -I./src -I./lz4 -I$(CMAKE_BASE_DIR)/_deps/zstd-src/lib/ -I$(CMAKE_BASE_DIR)/internal_hdf5-prefix/src/internal_hdf5/src/ -LFLAGS += -L$(CMAKE_BASE_DIR)/_deps/zstd-src/build/cmake -lzstd +CFLAGS += -I./src -I./lz4 -I$(CMAKE_BASE_DIR)/_deps/zstd-src/lib/ -I$(CMAKE_BASE_DIR)/../zstd-src/lib/ -I$(CMAKE_BASE_DIR)/internal_hdf5-prefix/src/internal_hdf5/src/ +LFLAGS += -L$(CMAKE_BASE_DIR)/_deps/zstd-src/build/cmake -L$(CMAKE_BASE_DIR)/../zstd-src/build/cmake -lzstd SRC_INP = $(wildcard ./src/*.c) SRC_INP += $(wildcard ./lz4/*.c) diff --git a/tests/lib_tests/lib_reader_tests.cpp b/tests/lib_tests/lib_reader_tests.cpp index b164046b..e94d2ac7 100644 --- a/tests/lib_tests/lib_reader_tests.cpp +++ b/tests/lib_tests/lib_reader_tests.cpp @@ -21,7 +21,7 @@ lofar_udp_config* config_setup(int32_t sampleMeta = 0, int32_t testNumber = 0, i lofar_udp_config *config = lofar_udp_config_alloc(); EXPECT_NE(nullptr, config); assert(numPorts <= MAX_NUM_PORTS); - config->calibrationDuration = 1.2; + config->calibrationDuration = static_cast(1.2); for (int32_t port = 0; port < testPorts; port++) { std::string workingString = std::regex_replace(inputLocations[testNumber], std::regex("portnum"), std::to_string(port));