multithreading – Timer thread in C

I’m coding in C on Ubuntu.
I need to write a timer thread in C. I have this thread

void * thread(void)
{
    sleep(TIMEOUT);
    pthread_mutex_lock(&mutex);
    //operations
    pthead_mutex_unlock(&mutex);

}

I need that another thread called for example timerManager can reset the timer. My first idea was to kill this thread and create another one, but this is a problem because if I kill this timer thread with pthread_cancel when he’s waiting for the mutex I create a deadlock because the mutex is in the lock state.

What can I do about?
Thanks to all in advance.

multithreading – Running DNS lookup over million IP addresses

I need to run over all the IP addresses on the CIDR 10.96.0.0/12 network.
There are 1,048,574 IP addresses.

I created a multi threaded program in Go to do it, but even when I use 100 threads it takes lots of time (didn’t check exactly but it is more than 10 minutes).

Is there are way I can do it more efficient?

My program:

package main

import (
    "fmt"
    "log"
    "net"
)

func getIPAddresses(cidr string) (()string, error) {
    ip, ipnet, err := net.ParseCIDR(cidr)
    if err != nil {
        return nil, err
    }

    var ips ()string
    for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
        ips = append(ips, ip.String())
    }
    // remove network address and broadcast address
    lenIPs := len(ips)
    var ipAddresses ()string

    switch {
    case lenIPs < 2:
        ipAddresses = ips

    default:
        // Shouldn't be panic here because we are checking the lenIPs before
        ipAddresses = ips(1 : len(ips)-1)
    }

    return ipAddresses, nil
}

//  http://play.golang.org/p/m8TNTtygK0
func inc(ip net.IP) {
    for j := len(ip) - 1; j >= 0; j-- {
        ip(j)++
        if ip(j) > 0 {
            break
        }
    }
}

func main() {
    ipAddresses, err := getIPAddresses("10.96.0.0/12")
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("(*) Scanning %d IP addressesn", len(ipAddresses))

    //var hostsMap map(string)()string
    //l := sync.Mutex{}
    threads := 100
    sem := make(chan bool, threads)

    count := 0
    for _, ip := range(ipAddresses) {

        sem <- true

        go func(ip string) {
            count += 1
            if count % 1000 == 0 {
                fmt.Printf("Scan: %d addressesn", count)
            }
            _, err := net.LookupAddr(ip)
            if err != nil {
                //log.Fatal(err)
                //fmt.Println("Error")
            }

            <- sem
        }(ip)
    }

    for i := 0; i < cap(sem); i++ {
        sem <- true
    }

    fmt.Println("DONE")
}

multithreading – What is the best way to decouple and define the orchestration (coordination) of concurrent tasks?

I have a data synchronisation concurrent algorithm. It does the following: get data and files from server, send data and files to server, save them to database / filesystem. Imagine the system like this:

  1. You have 1000 functions. Each one does some atomic operation. For instance, fetch latest objects of type X and insert them into DB; upload this file of type Y and so on. Each function is independent and can act on its own, it does not communicate with or affect other functions. On the other hand, none of them is a pure function, because they all use theese common resources (fetching data from the server, puting data on DB, saving files on filesystem)
  2. You have a single entry point for the sychronization mechanism. The outside of the sync system can start the sync, say, by doing a Sync.start() call. Also, the sync has a single exit point. The sync can finish with either success, either failure (if any of those functions from (1) fail, the whole sync will fail). The ouside of the sync system can subscribe to onSyncSuccess / onSyncError events.
  3. You have this black box in the middle of the system. This could be, for instance, a single threaded algorithm calling those 1000 functions from (1). But I made it concurrent.

Now consider this. This concurrent algorithm right now is rigid because the way in which the functions are called is hardcoded. If I want to take a bunch of functions from (1) that right now are executing sequentially, and if I want to make them execute parallel, it would be impossible without refactoring the whole class hierarchy.

I was thinking about the concept of direct acyclic graphs, and I made my own domain-specific language in Kotlin to define such task graphs. Now I could write the whole orchestration declaratively like this:

notifySyncWasStarted()
runSequentialy {
    task { doTask1() }
    runInParallel {
        task { doTask2() }
        task { doTask3() }
    }
    task { doTask4() }
}
notifySyncWasStopped()

So first task1 gets executed, then task2 and 3 in the same time, then task4. By keeping this graph in a single file, I could easily modify the way tasks are executed. For instance, I could easily swap tasks:

notifySyncWasStarted()
runSequentialy {
    runInParallel {
        task { doTask4() }
        task { doTask2() }
    }
    task { doTask3() }
    task { doTask1() }
}
notifySyncWasStopped()

Here, (task 4 and 2) gets executed, then 3, then 1. This works by using the fork-join paradigm, I create threads then join them into the parent thread.

In contrast, right now, the algorithm is spread around multiple classes, each of them was designed to run the tasks in a specific manner. Changing how tasks are ran would mean to refactor the classes and how they communicate to each other.

The question is: What is the best way to decouple and define the orchestration (coordination) of concurrent tasks? So that this orchestration could be easily changed in the future? Is my solution optimal or the way to go (direct acyclic graphs, fork-join, plus a domain specific language)? Or maybe there are some other design patterns that do the same thing?

multithreading – How to use UDP MulticastSocket for Reception with ThreadHandler-Looper-Handler in Android

I am trying to find the right architecture to be able to listen to UDP Multicast socket, receive the datagram packet that is in JSON format and then send the data to my BroadCast Receiver. It is important that the UDP must always listen and store packets even when the application is minimized.

My current approach is to use a ThreadHandler and have the socket listen and interpret the packet within the method run(). However, to keep the socket alive and keep listening i am passing this reference as a runnable within the run method to keep it looping which I believe is a wrong practice.

So, what would be the “right” approach, in the context of using a Thread/ThreadHandler/Looper/Handler and what methods should be overridden?

Should I use a looper with onLooperPrepared() doing the setup of socket etc and then do all the work in run()? Maybe just use handler to pass simple messages in the MessageQueue and use handleMessage() to interpret messages and send to BroadcastReceiver?

Thank your for your time.

multithreading – QT thread with timer

I have a qt code that opens some threads and waits for a response, all timed with a QTimer.
For example

(thread1 is declared in the header)

thread1 = new QThread();
OpenCVWorker *worker1 = new OpenCVWorker();
QTimer *workerTrigger1 = new QTimer();
workerTrigger1->setInterval(30);

connect(workerTrigger1, SIGNAL(timeout()), worker1, SLOT(receiveFrame()));
connect(thread1, SIGNAL(finished()), worker1, SLOT(deleteLater()));
connect(thread1, SIGNAL(finished()), thread1, SLOT(deleteLater()));
connect(thread1, SIGNAL(finished()), workerTrigger1, SLOT(deleteLater()));
connect(this,SIGNAL(blockOpencvThread()),worker1,SLOT(blockThread()));
connect(this, SIGNAL(sendSetup1(QString)), worker1, SLOT(receiveSetup(QString)));
connect(this, SIGNAL(stopCameras()), worker1, SLOT(receiveToggleStream()));
connect(worker1, SIGNAL(sendFrame(QImage)), this, SLOT(receiveFrame(QImage)));

workerTrigger1->start();
worker1->moveToThread(thread1);
workerTrigger1->moveToThread(thread1);
thread1->start();
emit sendSetup1(Settings::ipCamera1);

When the program is going to end, I call

emit stopCameras();
if(thread1->isRunning())
{
    thread1->quit();
    thread1->wait();
}

Is this correct?
Thank you very much.

cpu – Why multithreading isn’t everywhere?

Not almost, but all modern CPUs have multiple cores, yet multithreading isn’t really that common. Why to have these cores then? To execute several sequential programs at the same time? Well, when calculations are complex (rendering, compiling), the program is definitely made to use advantage of multiple cores. But for other tasks a single core is enough?
I understand that multi-threading is hard to implement and has drawbacks if number of threads is less than expected. But not using these idle cores seems so irrational.

multithreading – Would implementing workers/threads in my NodeJS App benefit?

Okay so I’m writing a NodeJS Express API around 1 function for a client however for my standards I’d believe it’s not wrote ‘the best’ but it does the job, however I am being asked to add a bulk import method, so since the request would potentially take 10-15 seconds to complete anyway, could I somehow implement workers/threads into this?

require('dotenv').config();

const os = require("os");
const asyncRedis = require("async-redis");
const client = asyncRedis.createClient();
const formData = require("express-form-data");
const express = require('express');
const UserAgent = require('random-useragent');
const Apify = require('apify');
const faker = require('fakerator')();
const md5 = require('md5');
const HttpsProxyAgent = require('https-proxy-agent');
const axios = require('axios');
const xml2js = require('xml2js');

const app = express();

const options = {
    uploadDir: os.tmpdir(),
    autoClean: true
};

// parse data with connect-multiparty.
app.use(formData.parse(options));
// delete from the request all empty files (size == 0)
app.use(formData.format());
// change the file objects to fs.ReadStream
app.use(formData.stream());
// union the body and the files
app.use(formData.union());

const urls = {
    'order_flow': 'https://www.website.com/order',
    'validate_recaptcha': 'https://www.website.com/recaptcha',
    'validate_email': 'https://www.website.com/email',
    'verify_email': 'https://www.website.com/email',
    'order_status': 'https://www.website.com/OrderStatus',
    'rapid_api': 'privatix-temp-mail-v1.p.rapidapi.com'
};

const parser = new xml2js.Parser({ explicitArray: false });

let MyUserAgent;

//api vars
let model_info;
let captcha_solution;
let captcha_token;
let order_submit;
let order_confirm;
let userProfile;
let domains;
let emails;
let requestId;
let validationId;
let bitmap;
let emailConfirm;
let proxy;

function msleep(n) {
    Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, n);
}

function sleep(n) {
    msleep(n*1000);
}

function getProxy() {
    return new HttpsProxyAgent('http://username-' + (Math.floor(Math.random() * (50 - 1 + 1)) + 1) + ':password@host')
}

const getModelInfo = async (imei) => {
    return await axios.post(urls('order_flow'), {
        //pointless post data here
    });
}

const get_captcha_solution = async () => {
    return await Apify.callTask(process.env.APIFY_TASK_ID, {
        'key': process.env.ANTI_CAPTCHA_KEY,
        'webUrl': process.env.PROTECTED_WEBSITE_URL,
        'siteKey': process.env.CAPTCHA_DATA_SITEKEY
    });
}

const getEmails = async(username) => {
    return await axios.get('https://' + urls('rapid_api') + '/request/mail/id/' + md5(username) + '/', {
        'headers': {
            'x-rapidapi-host': urls('rapid_api'),
            'x-rapidapi-key': process.env.RAPIDAPI_KEY
        }
    }, {
        'httpsAgent': proxy
    });
}

const getDomains = async () => {
    return await axios.get('https://' + urls('rapid_api') + '/request/domains/', {
        'headers': {
            'x-rapidapi-host': urls('rapid_api'),
            'x-rapidapi-key': process.env.RAPIDAPI_KEY,
            'useQueryString': true
        }
    }, {
        'httpsAgent': proxy
    });
}

const submitOrder = async (imei, imeiRefId, make, makerefId, model, modelRefId, token, tokenRefId) => {
    return await axios.post(urls('order_flow'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
}

const orderConfirmDual = async (dualSim, imei2, email, firstName, imei, imeiRefId, lastName, make, makerefId, model, modelRefId, token, tokenRefId) => {
    return await axios.post(urls('order_flow'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
};

const orderConfirm = async (email, firstName, imei, imeiRefId, lastName, make, makerefId, model, modelRefId, token, tokenRefId) => {
    return await axios.post(urls('order_flow'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
};

const validateEmail = async (bitmap, requestId, validationId) => {
    return await axios.post(urls('verify_email'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
}

const orderStatus = async (imei, orderNumber, token, tokenRefId) => {
    return await axios.post(urls('order_status'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
}

const validateCaptcha = async (token) => {
    return await axios.post(urls('validate_recaptcha'), {
        //pointless post data here
    }, {
        'httpsAgent': proxy
    });
}

const createOrder = async (req, res) => {
    //config proxy
    proxy = getProxy();

    //params
    let params = await parser.parseStringPromise(req.body.parameters);
    let imei = params.PARAMETERS.IMEI;

    //start script
    MyUserAgent = UserAgent.getRandom(function (ua) {
        return ua.browserName === 'Firefox';
    });
    axios.defaults.withCredentials = true;
    axios.defaults.headers.common('User-Agent') = MyUserAgent;
    axios.defaults.timeout = 5000;
    userProfile = faker.entity.user();

    try {
        model_info = await getModelInfo(imei);
    } catch(error) {
        //store in redis
        await client.set(imei, JSON.stringify({
            'error': handleError(error)
        }), 'EX', 86400);
        console.log('(model_info) ERROR! : ' + handleError(error));
        return res.send(success_response({
            'MESSAGE': handleError(error)
        }));
    }

    console.log("We retrieved our model info");

    let solution = await client.get('captcha_solution');
    if(solution === null) {

        console.log('Solution was not stored, lets generate one!');

        try {
            captcha_token = await get_captcha_solution();
        } catch(error) {
            return res.send(error_response(handleError(error)));
        }

        console.log("We retrieved a captcha token, let's submit it! ");

        try {
            captcha_solution = await validateCaptcha(captcha_token.output.body);
        } catch(error) {
            return res.send(error_response(handleError(error)));
        }

        await client.set('captcha_solution', JSON.stringify(captcha_solution.data), 'EX', 3600);

        solution = JSON.stringify(captcha_solution.data);

    } else {

        console.log('Solution was stored!');

    }

    captcha_solution = JSON.parse(solution);

    try {
        order_submit = await submitOrder(
            imei,
            model_info.data.imeiRefId,
            model_info.data.make,
            model_info.data.makeRefId,
            model_info.data.model,
            model_info.data.modelRefId,
            captcha_solution.reCaptchaDetail.token,
            captcha_solution.reCaptchaDetail.tokenRefId
        )
    } catch(error) {
        //store in redis
        await client.set(imei, JSON.stringify({
            'error': handleError(error)
        }), 'EX', 86400);
        console.log('(order_submit) ERROR! : ' + handleError(error));
        return res.send(success_response({
            'MESSAGE': handleError(error)
        }));
    }

    console.log('We submitted the order!');

    try {
        domains = await getDomains();
    } catch(error) {
        return res.send(error_response('domain_email_api'));
    }

    userProfile.email = userProfile.userName + domains.data((Math.random() * domains.data.length) | 0);
    console.log('We have assigned a temporary email ' + userProfile.email);

    try {

        if(order_submit.data.dualSim === false) {
            order_confirm = await orderConfirm(
                userProfile.email,
                userProfile.firstName,
                imei,
                model_info.data.imeiRefId,
                userProfile.lastName,
                model_info.data.make,
                model_info.data.makeRefId,
                model_info.data.model,
                model_info.data.modelRefId,
                captcha_solution.reCaptchaDetail.token,
                captcha_solution.reCaptchaDetail.tokenRefId
            );
        } else {
            order_confirm = await orderConfirmDual(
                order_submit.data.dualSim,
                order_submit.data.imei2,
                userProfile.email,
                userProfile.firstName,
                imei,
                model_info.data.imeiRefId,
                userProfile.lastName,
                model_info.data.make,
                model_info.data.makeRefId,
                model_info.data.model,
                model_info.data.modelRefId,
                captcha_solution.reCaptchaDetail.token,
                captcha_solution.reCaptchaDetail.tokenRefId
            );
        }

    } catch (error) {
        //store in redis
        await client.set(imei, JSON.stringify({
            'error': handleError(error)
        }), 'EX', 86400);
        console.log('(order_confirm) ERROR! : ' + handleError(error));
        return res.send(success_response({
            'MESSAGE': handleError(error)
        }));
    }

    sleep(8);

    try {
        emails = await getEmails(userProfile.email);
    } catch (error) {
        return res.send(error_response('email_api_failed'));
    }

    let matches;
    emails.data.forEach(obj => {
        matches = obj.mail_text_only.match(/requestId=(.*)&validationId=(.*)&bitmap=(.*)"/);
    });
    if(matches.length > 0) {
        requestId = decodeURIComponent(matches(1));
        validationId = decodeURIComponent(matches(2));
        bitmap = decodeURIComponent(matches(3));
    }

    try {
        emailConfirm = await validateEmail(bitmap, requestId, validationId);
    } catch (error) {
        //store in redis
        await client.set(imei, JSON.stringify({
            'error': handleError(error)
        }), 'EX', 86400);
        console.log('(order_confirm) ERROR! : ' + handleError(error));
        return res.send(success_response({
            'MESSAGE': handleError(error)
        }));
    }

    await client.set(imei, JSON.stringify({
        'orderNumber': order_confirm.data.orderId,
        'token': captcha_solution.reCaptchaDetail.token,
        'tokenRefId': captcha_solution.reCaptchaDetail.tokenRefId
    }), 'EX', 86400);

    return res.send(success_response({
        'MESSAGE': 'Order Received!',
        'REFERENCEID': order_confirm.data.requestNo
    }));
}

const handleError = function(error) {
    if(error.code) {
        if(error.code === 'ECONNABORTED') {
            return 'api_server_timeout';
        }
    }
    if(error.response) {
        switch(error.response.status) {
            case 504:
                return 'api_server_timeout';
            case 406:
                return error.response.data.error.details(0).message;
            default:
                return 'server_error';
        }
    }
    return 'unknown_error';
}

function error_response(message) {
    return {
        'ERROR': ({
            'MESSAGE': message
        }),
        "apiversion": "6.1"
    };
}

function success_response(data) {
    return {
        'SUCCESS': (data),
        "apiversion": "6.1"
    };
}

app.post('/api/index.php', async function (req, res) {
    res.setHeader('Content-Type', 'application/json; charset=utf-8');
    res.setHeader('X-Powered-By', 'DHRU-FUSION');
    res.setHeader('dhru-fusion-api-version', '6.1');
    res.removeHeader('pragma');
    res.removeHeader('server');
    res.removeHeader('transfer-encoding');
    res.removeHeader('cache-control');
    res.removeHeader('expires');

    if(Object.keys(req.body).length === 0) {
        return res.send(error_response('Invalid Request'));
    }

    if(typeof req.body.apiaccesskey === undefined || req.body.apiaccesskey !== process.env.SITE_API_KEY) {
        return res.send(error_response('Invalid API Key'));
    }

    switch(req.body.action) {
        case "placeimeiorder":
            return await createOrder(req, res);
        case "getimeiorder":
            //params
            let params = await parser.parseStringPromise(req.body.parameters);

            console.log('getimeiorder!', params);

            if(typeof params.PARAMETERS.ID === undefined) {
                return res.send(success_response({
                    'STATUS': 3,
                    'CODE': 'Invalid Order'
                }));
            }
            let imei = params.PARAMETERS.ID;
            let solution = await client.get(imei);
            if(solution === null) {
                return res.send(success_response({
                    'STATUS': 3,
                    'CODE': 'Invalid Order'
                }));
            }
            solution = JSON.parse(solution);
            if(typeof solution.error !== undefined) {
                return res.send(success_response({
                    'STATUS': 4,
                    'CODE': solution.error.substring(0, solution.error.indexOf('.'))
                }));
            }
            const result = await orderStatus(imei, solution.orderNumber, solution.token, solution.tokenRefId);
            return res.send(success_response({
                'STATUS': 4,
                'CODE': result.data.unlockStatus + ' - ' + result.data.reason
            }));
        default:
            return res.send(error_response('Invalid Action'));
    }

});

app.listen(3000);

Main Points

/api/index.php is the only request that is handled.

createOrder is the function that I’d like to somehow implement threads/workers into so I could submit MULTIPLE orders via api/index.php?action=createbulkimeiorder

multithreading – Unit-testing a behavior that is thread exclusive

I need to write a unit test for an interface implementation that guarantees thread exclusive behavior.
The code examples are all written in Delphi, but that shouldn’t matter for the general behavior (at least not for Windows OS). I assume it’s more or less self explanatory.

The interface looks as follows:

ILockManager = interface(IInterface)
    // returns true if the same context isn't already locked from another thread, 
    // false otherwise.
    function lock(context : string) : boolean;

    // returns true if the same context is already locked from another thread, 
    // false otherwise.
    function isLocked(context : string) : boolean;

    // unlocks context if it is locked by the current thread, does nothing otherwise.
    procedure unlock(context : string);
end;

Now when writing the unit test I have encountered the problem, that I must hold a locked context from another thread running in the background, and verify that lock() returns false in the main thread (respectively true, when testing isLocked()).

My first approach was as follows:

procedure TestLock();
var
    lockingThread : TThread;
    threadProc : TProc;
    mtx : TMutex; // Shared in main thread and background thread
    lockManager : ILockManager;
    lockResult : boolean;
begin
    threadProc := procedure()
        var
            lockManager : ILockManager;
            lockResult : boolean;
        begin
            lockManager := TLockManagerImpl.Create();
            try
                lockResult := lockManager.lock('SomeContext');
                DUnitX.Assert.Assert.AreEqual(true,lockResult);

                // Hold the lock until the mutex is released from the main thread
                mtx.Acquire();
            finally
                lockManager.unlock('SomeContext');
                mtx.Release;
            end;
        end;

    mtx := TMutex.Create();
    lockingThread := TThread.CreateAnonymousThread(threadProc);
    lockManager := TLockManagerImpl.Create();

    try
         mtx.Acquire(); // The background thread should block after the lock is done
         lockingThread.Start();
         TThread.Yield(); // Ensure that the background thread is started
         
         lockResult := lockManager.lock('SomeContext');
         DUnitX.Assert.Assert.AreEqual(false,lockResult);

         // Let the background thread terminate and unlock
         mtx.Release();

         lockResult := lockManager.lock('SomeContext');
         DUnitX.Assert.Assert.AreEqual(true,lockResult);
    finally
        lockManager.unlock();
        mtx.Free();
    end;
end;

But that test fails, because the lock() call is never encountered in the background thread before lock is called in the main thread.

Questions:

Where is my flaw?
Do I need to use another mutex to guarantee the sequential behavior?

C++ multithreading logger class – Code Review Stack Exchange

I have designed a logger class to log messages to a file. It uses an independent thread to log the messages save to a queue previously using the main thread. I want to receive reviews about it.

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <fstream>
#include <atomic>
#include <string>
using namespace std::chrono_literals;

class Logger
{
    std::mutex queueMutex;
    std::condition_variable condVar;
    std::queue<std::string> messagesQueue;
    std::thread loggingThread;//background process launcher
    std::atomic_bool exit = false;//safety condition
    void processEntries()//the background process
    {
        // Open log file.
        std::ofstream logFile("log.txt");
        if (logFile.fail()) {
            std::cerr << "Failed to open logfile." << std::endl;
            return;
        }

        // Start processing loop. It process only one message for each iteration
        // to gain more performance
        while (!exit) {
            std::unique_lock lock(queueMutex);

            // Wait for a notification and don't wakeup unless the queue isn't empty.
            condVar.wait(lock, (this){return !messagesQueue.empty();});

            //log to the file
            logFile << messagesQueue.front() << std::endl;
            messagesQueue.pop();
        }

        //At the end, if the queue has some messages. here you don't need mutexes
        // because you have reached the destructor i.e you won't enqueue any messages anymore
        while(exit && !messagesQueue.empty()){

            //log to the file
            logFile << messagesQueue.front() << std::endl;
            messagesQueue.pop();
        }

    }

public:
    Logger()
    {
        //the default ctor launches the background process task
        loggingThread = std::thread{ &Logger::processEntries, this };
    }
    Logger(const Logger& src) = delete;
    Logger& operator=(const Logger& rhs) = delete;

    //logs the messages to the queue
    void log(std::string_view entry)
    {
        std::unique_lock lock(queueMutex);
        messagesQueue.push(std::string(entry));
        condVar.notify_all();
    }


    ~Logger()
    {
        exit = true;
        loggingThread.join();
    }

};


int main(){

    Logger lg;
    for(int i = 1;i < 10000;i++){
        lg.log("This is the message number " + std::to_string(i));
    }

    std::this_thread::sleep_for(10ms);

    for(int i = 10000;i < 20001;i++){
        lg.log("This is the message number " + std::to_string(i));
    }

}

multithreading – Use cURL and threads to make API calls

Requirement

We use a tool where we have Jython as one of the scripting languages available for making custom scripts. The requirement is that we need to make multiple API calls (in parallel) and aggregate those results in a single JSON using a script.

Previous attempts

My go-to instinct was to use requests but it didn’t work out. Please refer to the following StackOverflow question.

Jython: Getting SSLError when doing requests get

In my local environment, the simple Java way of doing it worked but when trying to do the same thing in the tool, it gave me an error.

Working implementation

So, the only other option that we could think of was using cURL. Following is a similar implementation that we have.

Note: I used the jsonplaceholder APIs for doing this test.

import json
import subprocess
from threading import Thread
import pprint


def execute_get_api(url, key, results):
    p1 = subprocess.Popen((
        'curl', '--location', '--request', 'GET', url
    ), stdout=subprocess.PIPE, shell=False)
    out, err = p1.communicate()
    results(key) = json.loads(out)


def get_post(post_id):
    post_url = 'https://jsonplaceholder.typicode.com/posts/%s' % post_id
    comments_url = 'https://jsonplaceholder.typicode.com/posts/%s/comments' % post_id

    # Temporary dictionary to store responses from the API calls
    results = {
        "post": "",
        "comments": ""
    }

    # Making the API calls
    threads = ()
    get_posts = Thread(target=execute_get_api,
                       args=(post_url, 'post', results))
    get_comments = Thread(target=execute_get_api,
                          args=(comments_url, 'comments', results))

    get_posts.start()
    get_comments.start()

    threads.append(get_posts)
    threads.append(get_comments)

    for th in threads:
        th.join()

    return results


def driver():
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(get_post(1))


if __name__ == '__main__':
    driver()

Output

{   'comments': (   {   u'body': u'laudantium enim quasi est quidem magnam voluptate ipsam eosntempora quo necessitatibusndolor quam autem quasinreiciendis et nam sapiente accusantium',
                        u'email': u'Eliseo@gardner.biz',
                        u'id': 1,
                        u'name': u'id labore ex et quam laborum',
                        u'postId': 1},
                    {   u'body': u'est natus enim nihil est dolore omnis voluptatem numquamnet omnis occaecati quod ullam atnvoluptatem error expedita pariaturnnihil sint nostrum voluptatem reiciendis et',
                        u'email': u'Jayne_Kuhic@sydney.com',
                        u'id': 2,
                        u'name': u'quo vero reiciendis velit similique earum',
                        u'postId': 1},
                    {   u'body': u'quia molestiae reprehenderit quasi aspernaturnaut expedita occaecati aliquam eveniet laudantiumnomnis quibusdam delectus saepe quia accusamus maiores nam estncum et ducimus et vero voluptates excepturi deleniti ratione',
                        u'email': u'Nikita@garfield.biz',
                        u'id': 3,
                        u'name': u'odio adipisci rerum aut animi',
                        u'postId': 1},
                    {   u'body': u'non et atquenoccaecati deserunt quas accusantium unde odit nobis qui voluptatemnquia voluptas consequuntur itaque dolornet qui rerum deleniti ut occaecati',
                        u'email': u'Lew@alysha.tv',
                        u'id': 4,
                        u'name': u'alias odio sit',
                        u'postId': 1},
                    {   u'body': u'harum non quasi et rationentempore iure ex voluptates in rationenharum architecto fugit inventore cupiditatenvoluptates magni quo et',
                        u'email': u'Hayden@althea.biz',
                        u'id': 5,
                        u'name': u'vero eaque aliquid doloribus et culpa',
                        u'postId': 1}),
    'post': {   u'body': u'quia et suscipitnsuscipit recusandae consequuntur expedita et cumnreprehenderit molestiae ut ut quas totamnnostrum rerum est autem sunt rem eveniet architecto',
                u'id': 1,
                u'title': u'sunt aut facere repellat provident occaecati excepturi optio reprehenderit',
                u'userId': 1}}

I’m not sure if there is any other way of doing this. Also, do let me know if this approach can be improved.

Any help will be appreciated 🙂