From 219bf76535bd0d75af7ec31cc07a7c1f022686e5 Mon Sep 17 00:00:00 2001 From: Edgaru089 Date: Sun, 14 Nov 2021 19:25:18 +0800 Subject: [PATCH] smp: channel (WIP) --- smp/channel.c | 98 ++++++++++++++++++++++++++++++++++++++++++ smp/channel.h | 63 +++++++++++++++++++++++++++ smp/channel_internal.h | 11 +++++ 3 files changed, 172 insertions(+) create mode 100644 smp/channel.c create mode 100644 smp/channel.h create mode 100644 smp/channel_internal.h diff --git a/smp/channel.c b/smp/channel.c new file mode 100644 index 0000000..f4f4acb --- /dev/null +++ b/smp/channel.c @@ -0,0 +1,98 @@ + +#include "channel.h" +#include "../interrupt/interrupt.h" +#include "../memory/memory.h" +#include "kthread.h" +#include "string.h" + + +smp_Channel *smp_Channel_Create(uintptr_t objectSize, uintptr_t bufferSize) { + smp_Channel *c = kMalloc(sizeof(smp_Channel)); + + c->objSize = objectSize; + c->buffer = kMalloc(objectSize); + c->send = smp_Condition_Create(); + c->recv = smp_Condition_Create(); + + if (bufferSize) { + c->queue = kMalloc(sizeof(queue_Queue)); + queue_InitBuffered(c->queue, kMalloc(objectSize * bufferSize), objectSize * bufferSize); + } else + c->queue = NULL; + + return c; +} + +// Destroy frees a Channel. +// TODO This is unsafe, dont use while there are someone waiting on it +void smp_Channel_Destroy(smp_Channel *c) { + c->objSize = 0; + + smp_Condition_NotifyAll(c->send, 0); + smp_Condition_NotifyAll(c->recv, 0); + + kFree(c->queue->data); + kFree(c->queue); + kFree(c->buffer); + kFree(c); +} + + +static inline uintptr_t __smp_Channel_SendUnbuffered(smp_Channel *c, void *data, uintptr_t count) { + uintptr_t sent = 0; +} + +uintptr_t smp_Channel_Send(smp_Channel *c, void *data, uintptr_t count) { + uintptr_t sent = 0; + + int i = 0; + + if (c->queue) { + // Fill the queue first + INTERRUPT_DISABLE; + while (i < count && queue_Space(c->queue)) + queue_Push(c->queue, data + i++ * c->objSize, c->objSize); + INTERRUPT_RESTORE; + + // Notify the waiting crew + smp_Condition_NotifyAll(c->recv, 0); + smp_thread_Yield(); + } + + sent = i; + + // If there are leftovers: + for (; i < count; i++) { + smp_Condition_Wait(c->send); + INTERRUPT_DISABLE; + memcpy(c->buffer, data + i * c->objSize, c->objSize); + INTERRUPT_RESTORE; + smp_Condition_NotifyOne(c->recv, (void *)1); + smp_thread_Yield(); + } + + + return sent; +} + +// TrySend attempts sending COUNT objects pointed at DATA down the channel. +// +// It gives up if there are no one to receive the data, and the buffer if already full. +// +// Returns the number of objects sent. +uintptr_t smp_Channel_TrySend(smp_Channel *chan, void *data, uintptr_t count); + +// Receive reveices at most MAXCOUNT objects, writing them into the buffer DATA. +// +// It returns at once if there are some data, not necessrily filling the entire buffer. +// It waits if there are no data available, it will not return 0 (unless the struct is destroyed). +// +// Returns the number of objects received. +uintptr_t smp_Channel_Receive(smp_Channel *chan, void *data, uintptr_t maxcount); + +// TryReceive attempts receiving at most MAXCOUNT objects. +// +// If there are no data available, it returns 0. +// +// Returns the number of objects received. +uintptr_t smp_Channel_TryReceive(smp_Channel *chan, void *data, uintptr_t maxcount); diff --git a/smp/channel.h b/smp/channel.h new file mode 100644 index 0000000..0ecc9c1 --- /dev/null +++ b/smp/channel.h @@ -0,0 +1,63 @@ +#pragma once + +#include "condiction.h" + +#include "../main.h" +#include "../util/queue.h" +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +// Channel is a one-way data channel across threads, with a fix-sized buffer. +typedef struct { + queue_Queue * queue; // internal buffer of data + void * buffer; // buffer for at least one piece of data + vector_Vector *send, *recv; // __smp_Channel_Waiter, for send and receive to wait on, and notify the other + uintptr_t objSize; // size in bytes of the underlying object, 0 if destroyed + intptr_t waitcnt; // (unused) >0: Send is waiting; <0: Recv is waiting; ==0: Nobody is waiting +} smp_Channel; + +// Create allocates a new Channel. +smp_Channel *smp_Channel_Create(uintptr_t objectSize, uintptr_t bufferSize); + +// Destroy frees a Channel. +void smp_Channel_Destroy(smp_Channel *chan); + +// Send sends COUNT objects pointed at DATA down the channel. +// +// It waits if there are no one to receive it, and the buffer is already full. +// It will always return COUNT, unless the struct is destroyed. +// +// Returns the number of objects sent. +uintptr_t smp_Channel_Send(smp_Channel *chan, void *data, uintptr_t count); + +// TrySend attempts sending COUNT objects pointed at DATA down the channel. +// +// It gives up if there are no one to receive the data, and the buffer if already full. +// +// Returns the number of objects sent. +uintptr_t smp_Channel_TrySend(smp_Channel *chan, void *data, uintptr_t count); + +// Receive reveices at most MAXCOUNT objects, writing them into the buffer DATA. +// +// It returns at once if there are some data, not necessrily filling the entire buffer. +// It waits if there are no data available, it will not return 0 (unless the struct is destroyed). +// +// Returns the number of objects received. +uintptr_t smp_Channel_Receive(smp_Channel *chan, void *data, uintptr_t maxcount); + +// TryReceive attempts receiving at most MAXCOUNT objects. +// +// If there are no data available, it returns 0. +// +// Returns the number of objects received. +uintptr_t smp_Channel_TryReceive(smp_Channel *chan, void *data, uintptr_t maxcount); + + +#ifdef __cplusplus +} +#endif diff --git a/smp/channel_internal.h b/smp/channel_internal.h new file mode 100644 index 0000000..6da20c3 --- /dev/null +++ b/smp/channel_internal.h @@ -0,0 +1,11 @@ +#pragma once + +#include "channel.h" + + +// __smp_Channel_Waiter goes into Channel.send/recv +typedef struct { + void * buffer; + uintptr_t count; // object count, NOT bytes + smp_Condition *cond; +} __smp_Channel_Waiter;