Refactor opus queue into generic datastructure

main
Cameron Murphy Reikes 2 years ago
parent 8ae4d90f1b
commit 6c0211436e

@ -237,6 +237,7 @@
<ItemGroup>
<ClInclude Include="hueshift.gen.h" />
<ClInclude Include="ipsettings.h" />
<ClInclude Include="queue.h" />
<ClInclude Include="thirdparty\minilzo\lzoconf.h" />
<ClInclude Include="thirdparty\minilzo\lzodefs.h" />
<ClInclude Include="thirdparty\minilzo\minilzo.h" />

@ -194,6 +194,9 @@
<ClInclude Include="hueshift.gen.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="queue.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="thirdparty\enet\enet_dll.cbp" />

@ -1,4 +1,7 @@
#include <chipmunk.h>
#define QUEUE_IMPL
#include "stdbool.h"
#include "queue.h"
#include "types.h"
#include "ipsettings.h" // debug/developer settings
@ -27,7 +30,6 @@ void __assert(bool cond, const char* file, int line, const char* cond_string)
}
}
#define assert(condition) __assert(condition, __FILE__, __LINE__, #condition)
static V2 cp_to_v2(cpVect v)
{
@ -968,19 +970,19 @@ SerMaybeFailure ser_entity(SerState* ser, GameState* gs, Entity* e)
return ser_ok;
}
SerMaybeFailure ser_opus_packets(SerState* ser, OpusBuffer* mic_or_speaker_data)
SerMaybeFailure ser_opus_packets(SerState* ser, Queue* mic_or_speaker_data)
{
bool no_more_packets = false;
if (ser->serializing)
{
int queued = num_queued_packets(mic_or_speaker_data);
for (int i = 0; i < queued; i++)
size_t queued = queue_num_elements(mic_or_speaker_data);
for (size_t i = 0; i < queued; i++)
{
SER_VAR(&no_more_packets);
OpusPacket* cur = pop_packet(mic_or_speaker_data);
OpusPacket* cur = (OpusPacket*)queue_pop_element(mic_or_speaker_data);
bool isnull = cur == NULL;
SER_VAR(&isnull);
if (!isnull)
if (!isnull && cur != NULL) // cur != NULL is to suppress VS warning
{
SER_VAR(&cur->length);
SER_DATA(cur->data, cur->length);
@ -996,7 +998,7 @@ SerMaybeFailure ser_opus_packets(SerState* ser, OpusBuffer* mic_or_speaker_data)
SER_VAR(&no_more_packets);
if (no_more_packets)
break;
OpusPacket* cur = push_packet(mic_or_speaker_data);
OpusPacket* cur = (OpusPacket*)queue_push_element(mic_or_speaker_data);
OpusPacket dummy;
if (cur == NULL)
cur = &dummy; // throw away this packet

@ -18,6 +18,7 @@
#define STB_IMAGE_IMPLEMENTATION
#include "stb_image.h"
#include "types.h"
#include "queue.h"
#include "opus.h"
@ -104,8 +105,10 @@ static ma_device microphone_device;
static ma_device speaker_device;
OpusEncoder* enc;
OpusDecoder* dec;
OpusBuffer packets_to_send = { 0 };
OpusBuffer packets_to_play = { 0 };
Queue packets_to_send = { 0 };
char packets_to_send_data[QUEUE_SIZE_FOR_ELEMENTS(sizeof(OpusPacket), VOIP_PACKET_BUFFER_SIZE)];
Queue packets_to_play = { 0 };
char packets_to_play_data[QUEUE_SIZE_FOR_ELEMENTS(sizeof(OpusPacket), VOIP_PACKET_BUFFER_SIZE)];
ma_mutex send_packets_mutex = { 0 };
ma_mutex play_packets_mutex = { 0 };
@ -243,8 +246,13 @@ void microphone_data_callback(ma_device* pDevice, void* pOutput, const void* pIn
if (peer != NULL)
{
ma_mutex_lock(&send_packets_mutex);
OpusPacket* packet = push_packet(&packets_to_send);
if (packet != NULL)
OpusPacket* packet = queue_push_element(&packets_to_send);
if (packet == NULL)
{
queue_clear(&packets_to_send);
packet = queue_push_element(&packets_to_send);
}
assert(packet != NULL);
{
opus_int16 muted_audio[VOIP_EXPECTED_FRAME_COUNT] = { 0 };
const opus_int16* audio_buffer = (const opus_int16*)pInput;
@ -262,8 +270,8 @@ void speaker_data_callback(ma_device* pDevice, void* pOutput, const void* pInput
{
assert(frameCount == VOIP_EXPECTED_FRAME_COUNT);
ma_mutex_lock(&play_packets_mutex);
OpusPacket* cur_packet = pop_packet(&packets_to_play);
if (cur_packet != NULL && cur_packet->length > 0) // length of 0 means skipped packet
OpusPacket* cur_packet = (OpusPacket*)queue_pop_element(&packets_to_play);
if (cur_packet != NULL)
{
opus_decode(dec, cur_packet->data, cur_packet->length, (opus_int16*)pOutput, frameCount, 0);
}
@ -278,6 +286,8 @@ void speaker_data_callback(ma_device* pDevice, void* pOutput, const void* pInput
static void
init(void)
{
queue_init(&packets_to_play, sizeof(OpusPacket), packets_to_play_data, ARRLEN(packets_to_play_data));
queue_init(&packets_to_send, sizeof(OpusPacket), packets_to_send_data, ARRLEN(packets_to_send_data));
// audio
{

@ -0,0 +1,112 @@
#pragma once
#ifndef QUEUE_ASSERT
void __assert(bool cond, const char* file, int line, const char* cond_string);
#define QUEUE_ASSERT(condition) __assert(condition, __FILE__, __LINE__, #condition)
#endif
typedef struct QueueElementHeader {
bool exists;
struct QueueElementHeader* next;
char data[];
} QueueElementHeader;
typedef struct Queue {
char* data;
size_t data_length; // must be a multiple of sizeof(QueueElementHeader) + element_size
size_t element_size;
QueueElementHeader* next;
} Queue;
#define QUEUE_SIZE_FOR_ELEMENTS(element_size, max_elements) ((sizeof(QueueElementHeader) + element_size) * max_elements)
void queue_init(Queue* q, size_t element_size, char* data, size_t data_length);
void queue_clear(Queue* q);
void* queue_push_element(Queue* q);
size_t queue_num_elements(Queue* q);
void* queue_pop_element(Queue* q);
#ifdef QUEUE_IMPL
void queue_init(Queue* q, size_t element_size, char* data, size_t data_length)
{
q->data = data;
q->data_length = data_length;
q->element_size = element_size;
QUEUE_ASSERT(data_length % (sizeof(QueueElementHeader) + element_size) == 0);
}
void queue_clear(Queue* q)
{
QUEUE_ASSERT(q->data != NULL);
for (size_t i = 0; i < q->data_length; i++)
{
q->data[i] = 0;
}
q->next = NULL;
}
#define QUEUE_ELEM_ITER(cur) for(QueueElementHeader *cur = (QueueElementHeader*)q->data; (char*)cur < q->data + q->data_length; (char*)cur += (sizeof(QueueElementHeader) + q->element_size))
// you push an element, get the return value, cast it to your type, and fill it with data. It's that easy!
// if it's null the queue is out of space
void* queue_push_element(Queue* q)
{
QUEUE_ASSERT(q->data != NULL);
QueueElementHeader* to_return = NULL;
QUEUE_ELEM_ITER(cur)
{
if (!cur->exists)
{
to_return = cur;
break;
}
}
// no free packet found in the buffer
if (to_return == NULL)
{
return NULL;
}
else {
to_return->exists = true;
to_return->next = NULL; // very important.
for (size_t i = 0; i < q->element_size; i++)
to_return->data[i] = 0;
// add to the end of the linked list chain
if (q->next != NULL)
{
QueueElementHeader* cur = q->next;
while (cur->next != NULL) cur = cur->next;
cur->next = to_return;
}
else {
q->next = to_return;
}
return (void*)to_return->data;
}
}
size_t queue_num_elements(Queue* q)
{
QUEUE_ASSERT(q->data != NULL);
size_t to_return = 0;
QUEUE_ELEM_ITER(cur)
if (cur->exists) to_return++;
return to_return;
}
// returns null if the queue is empty
void* queue_pop_element(Queue* q)
{
QUEUE_ASSERT(q->data != NULL);
QueueElementHeader* to_return = q->next;
if (q->next != NULL) q->next = q->next->next;
if (to_return != NULL) to_return->exists = false; // jank!
return to_return == NULL ? NULL : (void*)to_return->data;
}
#undef QUEUE_ELEM_ITER
#endif

@ -77,8 +77,9 @@ void server(void* info_raw)
initialize(&gs, entity_data, entities_size);
Log("Allocated %zu bytes for entities\n", entities_size);
OpusBuffer* player_voip_buffers[MAX_PLAYERS] = { 0 };
for (int i = 0; i < MAX_PLAYERS; i++) player_voip_buffers[i] = calloc(1, sizeof * player_voip_buffers[i]);
Queue player_voip_buffers[MAX_PLAYERS] = { 0 };
size_t player_voip_buffer_size = QUEUE_SIZE_FOR_ELEMENTS(sizeof(OpusPacket), VOIP_PACKET_BUFFER_SIZE);
for (int i = 0; i < MAX_PLAYERS; i++) queue_init(&player_voip_buffers[i], sizeof(OpusPacket), calloc(1, player_voip_buffer_size), player_voip_buffer_size);
OpusEncoder* player_encoders[MAX_PLAYERS] = { 0 };
OpusDecoder* player_decoders[MAX_PLAYERS] = { 0 };
@ -270,8 +271,9 @@ void server(void* info_raw)
else {
int64_t player_slot = (int64_t)event.peer->data;
size_t length = event.packet->dataLength;
OpusBuffer throwaway_buffer = { 0 };
OpusBuffer* buffer_to_fill = player_voip_buffers[player_slot];
#define VOIP_QUEUE_DECL(queue_name, queue_data_name) Queue queue_name = {0}; char queue_data_name[QUEUE_SIZE_FOR_ELEMENTS(sizeof(OpusPacket), VOIP_PACKET_BUFFER_SIZE)] = {0}; queue_init(&queue_name, sizeof(OpusPacket), queue_data_name, QUEUE_SIZE_FOR_ELEMENTS(sizeof(OpusPacket), VOIP_PACKET_BUFFER_SIZE))
VOIP_QUEUE_DECL(throwaway_buffer, throwaway_buffer_data);
Queue* buffer_to_fill = &player_voip_buffers[player_slot];
if (get_entity(&gs, gs.players[player_slot].entity) == NULL) buffer_to_fill = &throwaway_buffer;
struct ClientToServer received = { .mic_data = buffer_to_fill };
if (!client_to_server_deserialize(&gs, &received, event.packet->data, event.packet->dataLength))
@ -343,7 +345,7 @@ void server(void* info_raw)
opus_decoder_destroy(player_decoders[player_index]);
player_decoders[player_index] = NULL;
gs.players[player_index].connected = false;
clear_buffer(player_voip_buffers[player_index]);
queue_clear(&player_voip_buffers[player_index]);
event.peer->data = NULL;
}
break;
@ -441,7 +443,7 @@ void server(void* info_raw)
for (int packet_i = 0; packet_i < num_audio_packets; packet_i++)
{
opus_int16* to_dump_to = decoded_audio_packets[this_player_index][packet_i];
OpusPacket* cur_packet = pop_packet(player_voip_buffers[this_player_index]);
OpusPacket* cur_packet = (OpusPacket*)queue_pop_element(&player_voip_buffers[this_player_index]);
if (cur_packet == NULL)
opus_decode(player_decoders[this_player_index], NULL, 0, to_dump_to, VOIP_EXPECTED_FRAME_COUNT, 0);
else
@ -461,7 +463,7 @@ void server(void* info_raw)
char* compressed_buffer = malloc(sizeof * compressed_buffer * MAX_SERVER_TO_CLIENT);
// mix audio to be sent
OpusBuffer* buffer_to_play = calloc(1, sizeof * buffer_to_play); // @Robust no malloc, also in all other places no malloc
VOIP_QUEUE_DECL(buffer_to_play, buffer_to_play_data);
{
for (int packet_i = 0; packet_i < num_audio_packets; packet_i++)
{
@ -480,13 +482,13 @@ void server(void* info_raw)
{
for (int frame_i = 0; frame_i < VOIP_EXPECTED_FRAME_COUNT; frame_i++)
{
to_send_to_cur[frame_i] += (opus_int16)((float)decoded_audio_packets[other_player_index][packet_i][frame_i]*volume);
to_send_to_cur[frame_i] += (opus_int16)((float)decoded_audio_packets[other_player_index][packet_i][frame_i] * volume);
}
}
}
}
}
OpusPacket* this_packet = push_packet(buffer_to_play);
OpusPacket* this_packet = (OpusPacket*)queue_push_element(&buffer_to_play);
opus_int32 ret = opus_encode(player_encoders[this_player_index], to_send_to_cur, VOIP_EXPECTED_FRAME_COUNT, this_packet->data, VOIP_PACKET_MAX_SIZE);
if (ret < 0)
{
@ -500,7 +502,7 @@ void server(void* info_raw)
ServerToClient to_send = (ServerToClient){
.cur_gs = &gs,
.your_player = this_player_index,
.playback_buffer = buffer_to_play,
.playback_buffer = &buffer_to_play,
};
size_t len = 0;
@ -530,7 +532,6 @@ void server(void* info_raw)
{
Log("Failed to serialize data for client %d\n", this_player_index);
}
free(buffer_to_play);
free(bytes_buffer);
free(compressed_buffer);
}
@ -545,7 +546,7 @@ void server(void* info_raw)
if (player_decoders[i] != NULL)
opus_decoder_destroy(player_decoders[i]);
}
for (int i = 0; i < MAX_PLAYERS; i++) free(player_voip_buffers[i]);
for (int i = 0; i < MAX_PLAYERS; i++) free(player_voip_buffers[i].data);
free(world_save_buffer);
destroy(&gs);
free(entity_data);

@ -17,7 +17,7 @@
#define THRUSTER_ENERGY_USED_PER_SECOND 0.005f
#define VISION_RADIUS 12.0f
#define MAX_SERVER_TO_CLIENT 1024 * 512 // maximum size of serialized gamestate buffer
#define MAX_CLIENT_TO_SERVER 1024*10 // maximum size of serialized inputs and mic data
#define MAX_CLIENT_TO_SERVER 1024 * 10 // maximum size of serialized inputs and mic data
#define SUN_RADIUS 10.0f
#define INSTANT_DEATH_DISTANCE_FROM_SUN 2000.0f
#define SUN_POS ((V2){50.0f,0.0f})
@ -42,8 +42,9 @@
#define VOIP_PACKET_MAX_SIZE 4000
#define VOIP_DISTANCE_WHEN_CANT_HEAR (VISION_RADIUS*0.8f)
#define TIMESTEP (1.0f / 60.0f) // not required to simulate at this, but this defines what tick the game is on
#define TIME_BETWEEN_SEND_GAMESTATE (1.0f / 20.0f)
#define TIME_BETWEEN_INPUT_PACKETS (1.0f / 20.0f)
#define TIMESTEP (1.0f / 60.0f) // server required to simulate at this, defines what tick the game is on
#define SERVER_PORT 2551
#define INPUT_BUFFER 6
@ -57,6 +58,11 @@
#include <stdint.h> // tick is unsigned integer
#include <stdio.h> // logging on errors for functions
// defined in gamestate.c. Janky
#ifndef assert
#define assert(condition) __assert(condition, __FILE__, __LINE__, #condition)
#endif
// including headers from headers bad
#ifndef SOKOL_GP_INCLUDED
@ -80,6 +86,7 @@ typedef void cpShape;
#endif
#include <stdbool.h>
#include "queue.h"
#ifndef OPUS_TYPES_H
typedef int opus_int32;
@ -281,32 +288,26 @@ static float rotangle(enum CompassRotation rot)
}
typedef struct OpusPacket {
bool exists;
struct OpusPacket* next;
char data[VOIP_PACKET_MAX_SIZE];
opus_int32 length;
char data[VOIP_PACKET_MAX_SIZE];
} OpusPacket;
typedef struct OpusBuffer {
OpusPacket packets[VOIP_PACKET_BUFFER_SIZE];
OpusPacket* next;
} OpusBuffer;
typedef struct ServerToClient
{
struct GameState* cur_gs;
OpusBuffer* playback_buffer;
Queue* playback_buffer;
int your_player;
} ServerToClient;
typedef struct ClientToServer
{
OpusBuffer* mic_data; // on serialize, flushes this of packets. On deserialize, fills it
Queue* mic_data; // on serialize, flushes this of packets. On deserialize, fills it
InputFrame inputs[INPUT_BUFFER];
} ClientToServer;
#define DeferLoop(start, end) \
for (int _i_ = ((start), 0); _i_ == 0; _i_ += 1, (end))
// server
void server(void* info); // data parameter required from thread api...
@ -364,96 +365,6 @@ typedef struct ServerThreadInfo {
const char* world_save;
bool should_quit;
} ServerThreadInfo;
static void clear_buffer(OpusBuffer* buff)
{
*buff = (OpusBuffer){ 0 };
}
// you push a packet, get the return value, and fill it with data. It's that easy!
static OpusPacket* push_packet(OpusBuffer* buff)
{
OpusPacket* to_return = NULL;
for (size_t i = 0; i < VOIP_PACKET_BUFFER_SIZE; i++)
if (!buff->packets[i].exists)
{
to_return = &buff->packets[i];
break;
}
// no free packet found in the buffer
if (to_return == NULL)
{
Log("Opus Buffer Full\n");
clear_buffer(buff);
to_return = &buff->packets[0];
#if 0
to_return = buff->next;
buff->next = buff->next->next;
#endif
}
*to_return = (OpusPacket){ 0 };
to_return->exists = true;
// add to the end of the linked list chain
if (buff->next != NULL)
{
OpusPacket* cur = buff->next;
while (cur->next != NULL) cur = cur->next;
cur->next = to_return;
}
else {
buff->next = to_return;
}
return to_return;
}
static int num_queued_packets(OpusBuffer* buff)
{
int to_return = 0;
for (size_t i = 0; i < VOIP_PACKET_BUFFER_SIZE; i++)
if (buff->packets[i].exists) to_return++;
return to_return;
}
static OpusPacket* get_packet_at_index(OpusBuffer* buff, int i)
{
OpusPacket* to_return = buff->next;
int index_at = 0;
while (index_at < i)
{
if (to_return->next == NULL)
{
Log("FAILED TO GET TO INDEX %d\n", i);
return to_return;
}
to_return = to_return->next;
index_at++;
}
return to_return;
}
// returns null if the packet was dropped, like if the buffer was too full
static OpusPacket* pop_packet(OpusBuffer* buff)
{
#if 0
if (buff->skipped_packets > 0) {
buff->skipped_packets--;
return NULL;
}
#endif
OpusPacket* to_return = buff->next;
if (buff->next != NULL) buff->next = buff->next->next;
if (to_return != NULL) to_return->exists = false; // feels janky to do this
return to_return;
}
#define DeferLoop(start, end) \
for (int _i_ = ((start), 0); _i_ == 0; _i_ += 1, (end))
// all the math is static so that it can be defined in each compilation unit its included in
typedef struct AABB

Loading…
Cancel
Save