aboutsummaryrefslogtreecommitdiff
path: root/src/tpool.c
blob: a381565919ff1d3da831fbb4efffb3fa1736c780 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/* Thread pool implementation mostly copied from cbs.h */

#include <errno.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>

#include <alloc.h>
#include <array.h>
#include <errors.h>
#include <macros.h>

#include "tpool.h"
#include "work.h"

#include <stdio.h>

static int nproc(void);
static void *tpwork(void *);

static pthread_t thread_buffer[32];

extern const char *lquot, *rquot;

int
nproc(void)
{
	errno = 0;

	/* Grab the number of processors available on the users system.  If we can
	   we query sysconf() but fallback to 1 for systems that don’t support the
	   sysconf() method.  The user can also override this via the GRAB_NPROCS
	   environment variable, and if that’s invalid then we just issue a
	   diagnostic and default to 1.

	   We don’t want to error on an invalid value for GRAB_NPROCS because we
       might be running this tool as part of an editor plugin for example where
       finding the root cause of your regexp-search failing may not be so
       trivial. */

	const char *ev = getenv("GRAB_NPROCS");
	if (ev != nullptr && *ev != 0) {
		const char *endptr;
		long n = strtol(ev, (char **)&endptr, 10);
		if (errno == 0 && *endptr == 0)
			return (int)n;
		if (errno != 0)
			warn("strtol: %s:", ev);
		if (*endptr != 0)
			warn("Invalid value for %s%s%s for GRAB_NPROCS", lquot, ev, rquot);
		return 1;
	}

#ifdef _SC_NPROCESSORS_ONLN
	return (int)sysconf(_SC_NPROCESSORS_ONLN);
#else
	return 1;
#endif
}

int
tpinit(tpool_t *tp, const char **files, ptrdiff_t filecnt)
{
	tp->files = files;
	tp->filecnt = filecnt;
	tp->tcnt = nproc();
	tp->tcnt = MIN(tp->tcnt, filecnt);
	tp->wi = 0;

	if (tp->tcnt <= 32)
		tp->thrds = thread_buffer;
	else if ((tp->thrds = malloc(sizeof(*tp->thrds) * tp->tcnt)) == nullptr)
		err("malloc:");

	/* If for whatever reason some threads fail to be created, we don’t
       panic but instead just continue using the threads that were able
       to spawn.  If all threads fail to spawn we return 0 and the caller
       will resort to single-threaded behaviour. */

	int n = 0;
	for (int i = 0; i < tp->tcnt; i++) {
		if ((errno = pthread_create(tp->thrds + n, nullptr, tpwork, tp)) != 0)
			warn("failed to create thread:");
		else
			n++;
	}
	return n;
}

void
tpfree(tpool_t *tp)
{
	for (int i = 0; i < tp->tcnt; i++)
		pthread_join(tp->thrds[i], nullptr);

#if DEBUG
	if (tp->thrds != thread_buffer)
		free(tp->thrds);
#endif
}

void *
tpwork(void *arg)
{
	tpool_t *tp = arg;

	allocator_t mem = init_heap_allocator(nullptr);
	unsigned char *buf = array_new(mem, typeof(*buf), 4096);

	for (;;) {
		ptrdiff_t i = atomic_fetch_add(&tp->wi, 1);
		if (i >= tp->filecnt)
			break;
		process_file(tp->files[i], &buf);
	}

	fwrite(buf, 1, array_len(buf), stdout);

#if DEBUG
	array_free(buf);
#endif
	return nullptr;
}