Subject: How to structure code for multiple sessions with async API

How to structure code for multiple sessions with async API

From: Sanchay Harneja <sanchay.h_at_gmail.com>
Date: Tue, 27 Oct 2015 01:16:12 -0700

Hi,

I'm developing an application where every 5 mins I need to open ~40-50 ssh
sessions and download some data by running commands. I'm in a resource
constrained environment and want to minimize latency (there are a
downstream functions which get kicked after downloading data which take
time).

So I'm planning to use libssh2 async API and keep ssh sessions alive. I've
managed to write code with opens the 50 sessions one after another in my
main thread. After this I kick off 50 commands asynchronously and use poll
to wait on the sockets. So my code looks like (pseudo code):

open_session(i) {
  sessions[i] = libssh2_session_init();
  socks[i] = open_socket();
  libssh2_session_handshake(sessions[i], socks[i]);
  libssh2_userauth_password(sessions[i]);
}

process_async(i) {
  if (opened[i] == 0) {
     channels[i] = libssh2_channel_open_session(sessions[i]);
     if (rc == LIBSSH2_ERROR_EAGAIN) { return }
     opened[i] = 1;
  }
  if (exec[i] == 0) {
     libssh2_channel_exec(channels[i], command);
     if (rc == LIBSSH2_ERROR_EAGAIN) { return }
     exec[i] = 1;
  }
  if (read[i] == 0) {
     do { libssh2_channel_read(channels[i]); } while (rc > 0);
     if (rc == LIBSSH2_ERROR_EAGAIN) { return }
     read[i] = 1;
  }
  if (closed[i] == 0) {
    libssh2_channel_close(channels[i]);
    if (rc == LIBSSH2_ERROR_EAGAIN) { return }
    closed[i] = 1;
  }
  done[i] =1;
  opened[i] = 0;
  exec[i] = 0;
  read[i] = 0;
  closed[i] = 0;
}

process_all_tasks() {
  for (i=0 to 50) { process_async[i] } // this is fast
  num_done = 0;
  while (num_done < 50) {
     rc = poll(socks[] ...); // direction is set
using libssh2_session_block_directions
     if (rc <= 0) {error}
     for (i=0 to 50) {
       if (tiggered) {
         process_async(i);
         if (done[i]) { num_done++ }
       }
     }
  }
}

main() {
  for (i=0 to 50) { open_session[i] } // this takes time
  while (true) {
    process_all_tasks();
    sleep(5 minutes);
  }
}

First of all - not sure if this is a good way to structure the calls but
all of this code works - at least I've tested the happy path multiple
times. I can share as a gist if it helps (its in C++). Let me know if you
think this is bad / suggestions.

Now I'm thinking of how to put in libssh2_keepalive_send calls. One thought
I have is to combine the processing and sleeping:

main() {
  for (i=0 to 50) { open_session[i] } // this takes time
  num_done = 0;
  last_done_time = 0;
  while (true) {
    if (num_done >= 50 && (now() - last_done_time >= 5mins)) {
      num_done = 0;
      last_done_time = now();
      for (i=0 to 50) {
        process_async(i);
      }
    }
    rc = poll(socks[] ...); // direction is set
using libssh2_session_block_directions
    if (rc < 0) { error } // NOTE : in this case i'm treating timeout as
valid
    for (i=0 to 50) {
       if (tiggered && (done[i] == 0)) {
         process_async(i);
         if (done[i]) {num_done++}
       } else {
         libssh2_keepalive_send(sessions[i]);
       }
     }
  }
}

I've tested this and it seems to be working but I'm not really utilizing
the keepalive_send's output seconds value.

Another issue I have is how to break up this code into smaller modular
parts.

_______________________________________________
libssh2-devel http://cool.haxx.se/cgi-bin/mailman/listinfo/libssh2-devel
Received on 2015-10-27