Fifo non bloquant Linux (journalisation à la demande)

J’aime enregistrer une sortie de programmes «à la demande». Par exemple. la sortie est enregistrée sur le terminal, mais un autre processus peut être associé à la sortie en cours à tout moment.

La manière classique serait:

myprogram 2>&1 | tee /tmp/mylog 

et sur demande

 tail /tmp/mylog 

Cependant, cela créerait un fichier journal toujours croissant, même s’il n’est pas utilisé jusqu’à ce que le lecteur manque d’espace. Donc, ma tentative était la suivante:

 mkfifo /tmp/mylog myprogram 2>&1 | tee /tmp/mylog 

et sur demande

 cat /tmp/mylog 

Maintenant, je peux lire / tmp / mylog à tout moment. Cependant, toute sortie bloque le programme jusqu’à ce que / tmp / mylog soit lu. J’aime le fifo pour vider toutes les données entrantes non relues. Comment faire ça?

Inspiré par votre question, j’ai écrit un programme simple qui vous permettra de le faire:

$ myprogram 2>&1 | ftee /tmp/mylog

Il se comporte de la même façon que le tee mais clone le stdin en stdout et en un tube nommé (une exigence pour le moment) sans bloquer. Cela signifie que si vous voulez vous connecter de cette manière, il se peut que vous perdiez vos données de journal, mais je suppose que cela est acceptable dans votre scénario. L’astuce consiste à bloquer le signal SIGPIPE et à ignorer les erreurs d’écriture sur un fifo cassé. Cet exemple peut être optimisé de diverses manières bien sûr, mais jusqu’à présent, il fait le travail, je suppose.

 /* ftee - clone stdin to stdout and to a named pipe (c) racic@stackoverflow WTFPL Licence */ #include  #include  #include  #include  #include  #include  #include  #include  #include  int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; char buffer[BUFSIZ]; ssize_t bytes; signal(SIGPIPE, SIG_IGN); if(2!=argc) { printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a" " named pipe, required argument\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("ftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("ftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("ftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("ftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); while(1) { bytes = read(STDIN_FILENO, buffer, sizeof(buffer)); if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; bytes = write(STDOUT_FILENO, buffer, bytes); if(-1==bytes) perror("ftee: writing to stdout"); bytes = write(writefd, buffer, bytes); if(-1==bytes);//Ignoring the errors } close(writefd); return(0); } 

Vous pouvez le comstackr avec cette commande standard:

$ gcc ftee.c -o ftee

Vous pouvez rapidement le vérifier en exécutant par exemple:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

Notez également que ce n'est pas un multiplexeur. Vous ne pouvez avoir qu'un seul processus faisant $ cat /tmp/mylog à la fois.

C’est un (très) vieux thread, mais j’ai rencontré un problème similaire ces derniers temps. En fait, ce dont j’avais besoin était un clonage de stdin en stdout avec une copie sur un canal non bloquant. Le candidat proposé dans la première réponse a vraiment aidé, mais était (pour mon cas) trop volatil. Ce qui signifie que j’ai perdu des données que j’aurais pu traiter si je l’avais atteint à temps.

Le scénario auquel j’ai été confronté est que j’ai un processus (some_process) qui regroupe certaines données et écrit ses résultats toutes les trois secondes en stdout. La configuration (simplifiée) ressemblait à ceci (dans la configuration réelle, j’utilise un canal nommé):

 some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz 

Maintenant, raw_data.gz doit être compressé et doit être complet. ftee fait très bien ce travail. Mais le tube que j’utilise au milieu était trop lent pour récupérer les données vidées – mais c’était assez rapide pour tout traiter s’il pouvait y arriver, ce qui a été testé avec un tee normal. Cependant, un tee normal bloque si quelque chose arrive au tube sans nom, et comme je veux pouvoir être connecté à la demande, le tee n’est pas une option. Retour au sujet: Cela s’est amélioré lorsque j’ai mis un tampon entre les deux, ce qui a entraîné:

 some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz 

Mais cela perdait encore des données que j’aurais pu traiter. Donc, je suis allé de l’avant et a étendu le temps proposé auparavant à une version tamponnée (bftee). Il a toujours les mêmes propriétés, mais utilise un tampon interne (inefficace?) En cas d’échec de l’écriture. Il perd toujours des données si le tampon est saturé, mais cela fonctionne parfaitement pour mon cas. Comme toujours, il y a beaucoup de place à l’amélioration, mais comme je copiais le code, j’aimerais le partager avec les gens qui pourraient s’en servir.

 /* bftee - clone stdin to stdout and to a buffered, non-blocking pipe (c) racic@stackoverflow (c) fabraxias@stackoverflow WTFPL Licence */ #include  #include  #include  #include  #include  #include  #include  #include  #include  // the number of sBuffers that are being held at a maximum #define BUFFER_SIZE 4096 #define BLOCK_SIZE 2048 typedef struct { char data[BLOCK_SIZE]; int bytes; } sBuffer; typedef struct { sBuffer *data; //array of buffers int bufferSize; // number of buffer in data int start; // index of the current start buffer int end; // index of the current end buffer int active; // number of active buffer (currently in use) int maxUse; // maximum number of buffers ever used int drops; // number of discarded buffer due to overflow int sWrites; // number of buffer written to stdout int pWrites; // number of buffers written to pipe } sQueue; void InitQueue(sQueue*, int); // initialized the Queue void PushToQueue(sQueue*, sBuffer*, int); // pushes a buffer into Queue at the end sBuffer *ResortingeveFromQueue(sQueue*); // returns the first entry of the buffer and removes it or NULL is buffer is empty sBuffer *PeakAtQueue(sQueue*); // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer void ShrinkInQueue(sQueue *queue, int); // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty void DelFromQueue(sQueue *queue); // removes the first entry of the queue static void sigUSR1(int); // signal handled for SUGUSR1 - used for stats output to stderr static void sigINT(int); // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ? sQueue queue; // Buffer storing the overflow volatile int quit; // for quiting the main loop int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; sBuffer buffer; ssize_t bytes; int bufferSize = BUFFER_SIZE; signal(SIGPIPE, SIG_IGN); signal(SIGUSR1, sigUSR1); signal(SIGTERM, sigINT); signal(SIGINT, sigINT); /** Handle commandline args and open the pipe for non blocking writing **/ if(argc < 2 || argc > 3) { printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n" "FIFO - path to a named pipe, required argument\n" "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; if (argc == 3) { bufferSize = atoi(argv[2]); if (bufferSize == 0) bufferSize = BUFFER_SIZE; } readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("bftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("bftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("bftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("bftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); InitQueue(&queue, bufferSize); quit = 0; while(!quit) { // read from STDIN bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data)); // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; // save the number if read bytes in the current buffer to be processed buffer.bytes = bytes; // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less. bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes); queue.sWrites++; if(-1==bytes) { perror("ftee: writing to stdout"); break; } sBuffer *tmpBuffer = NULL; // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write // the buffered data to the pipe. This continues until the Buffer is empty or the write fails. // NOTE: bytes cannot be -1 (that would have failed just before) when the loop is entered. while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) { // write the oldest buffer to the pipe bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); // the written bytes are equal to the buffer size, the write is successful - remove the buffer and continue if (bytes == tmpBuffer->bytes) { DelFromQueue(&queue); queue.pWrites++; } else if (bytes > 0) { // on a positive bytes value there was a partial write. we shrink the current buffer // and handle this as a write failure ShrinkInQueue(&queue, bytes); bytes = -1; } } // There are several cases here: // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped. if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes); // again, there are several cases what can happen here // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue if (bytes != buffer.bytes) PushToQueue(&queue, &buffer, bytes); else queue.pWrites++; } // once we are done with STDIN, try to flush the buffer to the named pipe if (queue.active > 0) { //set output buffer to block - here we wait until we can write everything to the named pipe // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. int saved_flags = fcntl(writefd, F_GETFL); int new_flags = saved_flags & ~O_NONBLOCK; int res = fcntl(writefd, F_SETFL, new_flags); sBuffer *tmpBuffer = NULL; //TODO: this does not handle partial writes yet while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) { int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); if (bytes != -1) DelFromQueue(&queue); } } close(writefd); } /** init a given Queue **/ void InitQueue (sQueue *queue, int bufferSize) { queue->data = calloc(bufferSize, sizeof(sBuffer)); queue->bufferSize = bufferSize; queue->start = 0; queue->end = 0; queue->active = 0; queue->maxUse = 0; queue->drops = 0; queue->sWrites = 0; queue->pWrites = 0; } /** push a buffer into the Queue**/ void PushToQueue(sQueue *queue, sBuffer *p, int offset) { if (offset < 0) offset = 0; // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead if (offset == p->bytes) return; // in this case there are 0 bytes to add to the queue. Nothing to write // this should never happen - offset cannot be bigger than the buffer itself. Panic action if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);} // debug output on a partial write. TODO: remove this line // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n"); // copy the data from the buffer into the queue and remember its size memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset); queue->data[queue->end].bytes = p->bytes - offset; // move the buffer forward queue->end = (queue->end + 1) % queue->bufferSize; // there is still space in the buffer if (queue->active < queue->bufferSize) { queue->active++; if (queue->active > queue->maxUse) queue->maxUse = queue->active; } else { // Overwriting the oldest. Move start to next-oldest queue->start = (queue->start + 1) % queue->bufferSize; queue->drops++; } } /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/ sBuffer *ResortingeveFromQueue(sQueue *queue) { if (!queue->active) { return NULL; } queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; return &(queue->data[queue->start]); } /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/ sBuffer *PeakAtQueue(sQueue *queue) { if (!queue->active) { return NULL; } return &(queue->data[queue->start]); } /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/ void ShrinkInQueue(sQueue *queue, int bytes) { // cannot remove negative amount of bytes - this is an error case. Ignore it if (bytes <= 0) return; // remove the entry if the offset is equal to the buffer size if (queue->data[queue->start].bytes == bytes) { DelFromQueue(queue); return; }; // this is a partial delete if (queue->data[queue->start].bytes > bytes) { //shift the memory by the offset memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes); queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes; return; } // panic is the are to remove more than we have the buffer if (queue->data[queue->start].bytes < bytes) { perror("we wrote more than we had - this should never happen\n"); exit(EXIT_FAILURE); return; } } /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/ void DelFromQueue(sQueue *queue) { if (queue->active > 0) { queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; } } /** Stats output on SIGUSR1 **/ static void sigUSR1(int signo) { fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops); } /** handle signal for terminating **/ static void sigINT(int signo) { quit++; if (quit > 1) exit(EXIT_FAILURE); } 

Cette version prend un argument supplémentaire (facultatif) qui spécifie le nombre de blocs à mettre en mémoire tampon pour le canal. Mon exemple d’appel ressemble maintenant à ceci:

 some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz 

résultant en 16384 blocs à tamponner avant que les rejets ne se produisent. cela utilise environ 32 Mo de plus de mémoire, mais … on s’en fout?

Bien sûr, dans l’environnement réel, j’utilise un tube nommé afin que je puisse attacher et détacher au besoin. Il y a l’air de ceci:

 mkfifo named_pipe some_process | bftee named_pipe 16384 | gzip > raw_data.gz & cat named_pipe | onlineAnalysis.pl > results 

En outre, le processus réagit sur les signaux comme suit: SIGUSR1 -> imprimer les compteurs vers STDERR SIGTERM, SIGINT -> quitte d’abord la boucle principale et vide le tampon dans le tube, le second a immédiatement mis fin au programme.

Peut-être que cela aide quelqu’un à l’avenir …

Cependant, cela créerait un fichier journal toujours croissant, même s’il n’est pas utilisé jusqu’à ce que le lecteur manque d’espace.

Pourquoi ne pas faire pivoter périodiquement les journaux? Il y a même un programme pour le faire pour vous logrotate .

Il y a aussi un système pour générer des messages de journal et faire différentes choses avec eux selon leur type. Il s’appelle syslog .

Vous pourriez même combiner les deux. Demandez à votre programme de générer des messages syslog, configurez syslog pour les placer dans un fichier et utilisez logrotate pour vous assurer qu’ils ne remplissent pas le disque.


S’il s’avérait que vous écriviez pour un petit système embarqué et que la sortie du programme est lourde, vous pouvez envisager diverses techniques.

  • Syslog distant: envoie les messages syslog à un serveur syslog du réseau.
  • Utilisez les niveaux de gravité disponibles dans Syslog pour faire différentes choses avec les messages. Par exemple, abandonner “INFO” mais connectez-vous et renvoyez “ERR” ou supérieur. Par exemple, consoler
  • Utilisez un gestionnaire de signaux dans votre programme pour relire la configuration sur HUP et modifier la génération de journaux “à la demande” de cette manière.
  • Demandez à votre programme d’écouter sur un socket unix et écrivez les messages lorsqu’il est ouvert. Vous pourriez même implémenter et console interactive dans votre programme de cette façon.
  • En utilisant un fichier de configuration, fournissez un contrôle granulaire de la sortie de journalisation.

BusyBox souvent utilisé sur les périphériques embarqués peut créer un journal tamponné par

 syslogd -C 

qui peut être rempli par

 logger 

et lu par

 logread 

Fonctionne assez bien, mais ne fournit qu’un seul journal global.

Il semble que l’opérateur de redirection bash <> ( 3.6.10 Ouverture des descripteurs de fichiers pour la lecture et l’écriture ) indique que l’écriture dans le fichier / fifo est ouverte sans blocage. Cela devrait fonctionner:

 $ mkfifo /tmp/mylog $ exec 4<>/tmp/mylog $ myprogram 2>&1 | tee >&4 $ cat /tmp/mylog # on demend 

Solution donnée par gniourf_gniourf sur le canal IRC #bash.

Si vous pouvez installer un écran sur le périphérique intégré, vous pouvez y lancer «myprogram» et le détacher, puis le réinsérer chaque fois que vous souhaitez consulter le journal. Quelque chose comme:

 $ screen -t sometitle myprogram Hit Ctrl+A, then d to detach it. 

Chaque fois que vous voulez voir le résultat, rattachez-le:

 $ screen -DR sometitle Hit Ctrl-A, then d to detach it again. 

De cette façon, vous n’aurez pas à vous soucier de la sortie du programme en utilisant tout l’espace disque.

Le problème avec l’approche fifo donnée est que tout se bloque lorsque la mémoire tampon des tuyaux est remplie et qu’aucun processus de lecture n’est en cours.

Pour que l’approche fifo fonctionne, je pense qu’il faudrait implémenter un modèle client-serveur nommé similaire à celui mentionné dans BASH: Meilleure architecture pour la lecture de deux stream d’entrée (voir le code légèrement modifié ci-dessous, exemple de code 2).

Pour une solution de contournement, vous pouvez également utiliser une construction while ... read au lieu de tee stdout sur un tube nommé en implémentant un mécanisme de comptage dans la boucle while ... read qui remplacera périodiquement le fichier journal par un nombre de lignes spécifié. Cela empêcherait un fichier journal toujours croissant (exemple de code 1).

 # sample code 1 # terminal window 1 rm -f /tmp/mylog touch /tmp/mylog while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | while IFS="" read -r line; do lno=$((lno+1)) #echo $lno array[${lno}]="${line}" if [[ $lno -eq 10 ]]; then lno=$((lno+1)) array[${lno}]="-------------" printf '%s\n' "${array[@]}" > /tmp/mylog unset lno array fi printf '%s\n' "${line}" done # terminal window 2 tail -f /tmp/mylog #------------------------ # sample code 2 # code taken from: # https://stackoverflow.com/questions/6702474/bash-best-architecture-for-reading-from-two-input-streams # terminal window 1 # server ( rm -f /tmp/to /tmp/from mkfifo /tmp/to /tmp/from while true; do while IFS="" read -r -d $'\n' line; do printf '%s\n' "${line}" done /tmp/from & bgpid=$! exec 3>/tmp/to exec 4/tmp/to; exec 4 /dev/null else printf 'line from fifo: %s\n' "$line" > /dev/null fi done & trap "kill -TERM $"'!; exit' 1 2 3 13 15 while IFS="" read -r -d $'\n' line; do # can we make it atomic? # sleep 0.5 # dd if=/tmp/to iflag=nonblock of=/dev/null # flush fifo printf '\177%s\n' "${line}" done >&3 ) & # kill -TERM $! # terminal window 2 # tests echo hello > /tmp/to yes 1 | nl > /tmp/to yes 1 | nl | tee /tmp/to while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | tee -a /tmp/to # terminal window 3 cat /tmp/to | head -n 10 

Si votre processus écrit dans un fichier journal, puis efface le fichier et recommence de temps en temps, il ne devient pas trop gros ou utilise logrotate.

tail –follow = nom –retry my.log

C’est tout ce dont vous avez besoin Vous obtiendrez autant de défilement que votre terminal.

Rien de non standard n’est nécessaire.

Je ne l’ai pas essayé avec de petits fichiers journaux, mais tous nos journaux tournent comme ça et je n’ai jamais remarqué de perte de lignes.